Source code for triad.utils.rename
from collections import defaultdict
from typing import Any, Dict, Iterable, List, Optional
from .assertion import assert_or_throw
from .string import validate_triad_var_name
[docs]def normalize_names(names: List[Any]) -> Dict[Any, str]:
"""Normalize dataframe column names to follow Fugue column naming
rules. It only operates on names that are not valid to Fugue.
It tries to minimize the changes to the original name. Special characters
will be converted to ``_``, but if this does not provide a valid and
unique column name, more transformation will be done.
.. note::
This is a temporary solution before :class:`~.triad.collections.schema.Schema`
can take arbitrary names
.. admonition:: Examples
* ``[0,1]`` => ``{0:"_0", 1:"_1"}``
* ``["1a","2b"]`` => ``{"1a":"_1a", "2b":"_2b"}``
* ``["*a","-a"]`` => ``{"*a":"_a", "-a":"_a_1"}``
:param names: the columns names of a dataframe
:return: the rename operations as a dict, key is the original column
name, value is the new valid name.
"""
assert_or_throw(len(names) > 0, ValueError("names is empty"))
assert_or_throw(
len(set(names)) == len(names), ValueError(f"duplicated names found in {names}")
)
dup_ct = defaultdict(int)
dup_ct[""] = 1
result: Dict[Any, str] = {}
_names: List[str] = []
for _name in names:
if isinstance(_name, str) and validate_triad_var_name(_name):
dup_ct[_name] += 1
else:
_names.append(_name)
for _name in _names:
name = None if _name is None else str(_name)
nn = _normalize_name(name)
if dup_ct[nn] > 0:
while dup_ct[nn] > 0:
orig_nn = nn
nn = nn + "_" + str(dup_ct[nn])
dup_ct[orig_nn] += 1
dup_ct[nn] += 1
if not isinstance(_name, str) or _name != nn:
result[_name] = nn
return result
def _normalize_name(name: Optional[str]) -> str:
if name is None:
return ""
if validate_triad_var_name(name) and not all(x == "_" for x in name):
return name
name = name.strip()
if name == "":
return ""
name = "".join(_normalize_chars(name))
if name[0].isdigit():
name = "_" + name
if validate_triad_var_name(name) and not all(x == "_" for x in name):
return name
return ""
def _normalize_chars(name: str) -> Iterable[str]:
for c in name:
i = ord(c)
if i < len(_VALID_CHARS) and _VALID_CHARS[i]:
yield c
else:
yield "_"
def _get_valid_signs():
signs = [False] * 128
for i in range(len(signs)):
c = chr(i)
if c.isalpha() or c.isdigit() or c == "_":
signs[i] = True
return signs
_VALID_CHARS = _get_valid_signs()