Source code for triad.utils.convert

import datetime
import importlib
import inspect
from types import ModuleType
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import six  # type: ignore

try:  # pragma: no cover
    from ciso8601 import parse_datetime

    _HAS_CISO8601 = True
except ImportError:  # pragma: no cover
    _HAS_CISO8601 = False
from triad.utils.assertion import assert_or_throw


EMPTY_ARGS: List[Any] = []
EMPTY_KWARGS: Dict[str, Any] = {}


[docs] def get_caller_global_local_vars( global_vars: Optional[Dict[str, Any]] = None, local_vars: Optional[Dict[str, Any]] = None, start: int = -1, end: int = -1, ) -> Tuple[Dict[str, Any], Dict[str, Any]]: """Get the caller level global and local variables. :param global_vars: overriding global variables, if not None, will return this instead of the caller's globals(), defaults to None :param local_vars: overriding local variables, if not None, will return this instead of the caller's locals(), defaults to None :param start: start stack level (from 0 to any negative number), defaults to -1 which is one level above where this function is invoked :param end: end stack level (from ``start`` to any smaller negative number), defaults to -1 which is one level above where this function is invoked :return: tuple of `global_vars` and `local_vars` .. admonition:: Examples .. code-block:: python def caller(): x=1 assert 1 == get_value("x") def get_value(var_name): _, l = get_caller_global_local_vars() assert var_name in l assert var_name not in locals() return l[var_name] :Notice: This is for internal use, users normally should not call this directly. If merging multiple levels, the variables on closer level (to where it is invoked) will overwrite the further levels values if there is overlap. .. admonition:: Examples .. code-block:: python def f1(): x=1 def f2(): x=2 def f3(): _, l = get_caller_global_local_vars(start=-1,end=-2) assert 2 == l["x"] _, l = get_caller_global_local_vars(start=-2,end=-2) assert 1 == l["x"] f2() f1() """ assert_or_throw(start <= 0, ValueError(f"{start} > 0")) assert_or_throw(end <= start, ValueError(f"{end} > {start}")) stack = inspect.currentframe().f_back # type: ignore p = 0 while p > start and stack is not None: stack = stack.f_back p -= 1 g_arr: List[Dict[str, Any]] = [] l_arr: List[Dict[str, Any]] = [] while p >= end and stack is not None: g_arr.insert(0, stack.f_globals) l_arr.insert(0, stack.f_locals) stack = stack.f_back p -= 1 if global_vars is None: global_vars = {} for d in g_arr: global_vars.update(d) if local_vars is None: local_vars = {} for d in l_arr: local_vars.update(d) return global_vars, local_vars # type: ignore
[docs] def str_to_object( expr: str, global_vars: Optional[Dict[str, Any]] = None, local_vars: Optional[Dict[str, Any]] = None, ) -> Any: """Convert string expression to object. The string expression must express a type with relative or full path, or express a local or global instance without brackets or operators. :param expr: string expression, see examples below :param global_vars: overriding global variables, if None, it will use the caller's globals(), defaults to None :param local_vars: overriding local variables, if None, it will use the caller's locals(), defaults to None :return: the object :raises ValueError: unable to find a matching object .. admonition:: Examples .. code-block:: python class _Mock: def __init__(self, x=1): self.x = x m = _Mock() assert 1 == str_to_object("m.x") assert 1 == str_to_object("m2.x", local_vars={"m2": m}) assert RuntimeError == str_to_object("RuntimeError") assert _Mock == str_to_object("_Mock") .. note:: This function is to dynamically load an object from string expression. If you write that string expression as python code at the same location, it should generate the same result. """ try: if any(not p.isidentifier() for p in expr.split(".")): raise ValueError(f"{expr} is invalid") _globals, _locals = get_caller_global_local_vars(global_vars, local_vars) if "." not in expr: return eval(expr, _globals, _locals) parts = expr.split(".") v = _locals.get(parts[0], _globals.get(parts[0], None)) if v is not None and not isinstance(v, ModuleType): return eval(expr, _globals, _locals) root = ".".join(parts[:-1]) return getattr(importlib.import_module(root), parts[-1]) except ValueError: raise # pragma: no cover except Exception: raise ValueError(expr)
[docs] def str_to_type( s: str, expected_base_type: Optional[type] = None, global_vars: Optional[Dict[str, Any]] = None, local_vars: Optional[Dict[str, Any]] = None, ) -> type: """Given a string expression, find the first/last type from all import libraries. If the expression contains `.`, it's supposed to be a relative or full path of the type including modules. :param s: type expression, for example `triad.utils.iter.Slicer` or `str` :param expected_base_type: base class type that must satisfy, defaults to None :param global_vars: overriding global variables, if None, it will use the caller's globals(), defaults to None :param local_vars: overriding local variables, if None, it will use the caller's locals(), defaults to None :raises TypeError: unable to find a matching type :return: found type """ global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars) try: obj = str_to_object(s, global_vars, local_vars) except ValueError: raise TypeError(f"{s} is not a type") assert_or_throw(isinstance(obj, type), TypeError(f"{obj} is not a type")) assert_or_throw( expected_base_type is None or issubclass(obj, expected_base_type), TypeError(f"{obj} is not a subtype of {expected_base_type}"), ) return obj
[docs] def str_to_instance( s: str, expected_base_type: Optional[type] = None, args: List[Any] = EMPTY_ARGS, kwargs: Dict[str, Any] = EMPTY_KWARGS, global_vars: Optional[Dict[str, Any]] = None, local_vars: Optional[Dict[str, Any]] = None, ) -> Any: """Use :func:`~triad.utils.convert.str_to_type` to find a matching type and instantiate :param s: see :func:`~triad.utils.convert.str_to_type` :param expected_base_type: see :func:`~triad.utils.convert.str_to_type` :param args: args to instantiate the type :param kwargs: kwargs to instantiate the type :param global_vars: overriding global variables, if None, it will use the caller's globals(), defaults to None :param local_vars: overriding local variables, if None, it will use the caller's locals(), defaults to None :return: the instantiated the object """ global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars) t = str_to_type(s, expected_base_type, global_vars, local_vars) return t(*args, **kwargs)
[docs] def to_type( s: Any, expected_base_type: Optional[type] = None, global_vars: Optional[Dict[str, Any]] = None, local_vars: Optional[Dict[str, Any]] = None, ) -> type: """Convert an object `s` to `type` * if `s` is `str`: see :func:`~triad.utils.convert.str_to_type` * if `s` is `type`: check `expected_base_type` and return itself * else: check `expected_base_type` and return itself :param s: see :func:`~triad.utils.convert.str_to_type` :param expected_base_type: see :func:`~triad.utils.convert.str_to_type` :param global_vars: overriding global variables, if None, it will use the caller's globals(), defaults to None :param local_vars: overriding local variables, if None, it will use the caller's locals(), defaults to None :raises TypeError: if no matching type found :return: the matching type """ global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars) if isinstance(s, str): return str_to_type(s, expected_base_type, global_vars, local_vars) if isinstance(s, type): if expected_base_type is None or issubclass(s, expected_base_type): return s raise TypeError(f"Type mismatch {s} expected {expected_base_type}") t = type(s) if expected_base_type is None or issubclass(t, expected_base_type): return t raise TypeError(f"Type mismatch {s} expected {expected_base_type}")
[docs] def to_instance( s: Any, expected_base_type: Optional[type] = None, args: List[Any] = EMPTY_ARGS, kwargs: Dict[str, Any] = EMPTY_KWARGS, global_vars: Optional[Dict[str, Any]] = None, local_vars: Optional[Dict[str, Any]] = None, ) -> Any: """If s is str or type, then use :func:`~triad.utils.convert.to_type` to find matching type and instantiate. Otherwise return s if it matches constraints :param s: see :func:`~triad.utils.convert.to_type` :param expected_base_type: see :func:`~triad.utils.convert.to_type` :param args: args to instantiate the type :param kwargs: kwargs to instantiate the type :param global_vars: overriding global variables, if None, it will use the caller's globals(), defaults to None :param local_vars: overriding local variables, if None, it will use the caller's locals(), defaults to None :raises ValueError: if s is an instance but not a (sub)type of `expected_base_type` :raises TypeError: if s is an instance, args and kwargs must be empty :return: the instantiated object """ global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars) if s is None: raise ValueError("None can't be converted to instance") if isinstance(s, (str, type)): t = to_type(s, expected_base_type, global_vars, local_vars) return t(*args, **kwargs) else: if expected_base_type is not None and not isinstance(s, expected_base_type): raise TypeError(f"{str(s)} is not a subclass of {str(expected_base_type)}") if len(args) > 0 or len(kwargs) > 0: raise ValueError(f"Can't instantiate {str(s)} with different parameters") return s
[docs] def to_function( func: Any, global_vars: Optional[Dict[str, Any]] = None, local_vars: Optional[Dict[str, Any]] = None, ) -> Any: # noqa: C901 """For an expression, it tries to find the matching function. :params s: a string expression or a callable :param global_vars: overriding global variables, if None, it will use the caller's globals(), defaults to None :param local_vars: overriding local variables, if None, it will use the caller's locals(), defaults to None :raises AttributeError: if unable to find such a function :return: the matching function """ if isinstance(func, str): global_vars, local_vars = get_caller_global_local_vars(global_vars, local_vars) try: func = str_to_object(func, global_vars, local_vars) except ValueError: raise AttributeError(f"{func} is not a function") assert_or_throw( callable(func) and not isinstance(func, six.class_types), AttributeError(f"{func} is not a function"), ) return func
[docs] def get_full_type_path(obj: Any) -> str: """Get the full module path of the type (if `obj` is class or function) or type of the instance (if `obj` is an object instance) :param obj: a class/function type or an object instance :raises TypeError: if `obj` is None, lambda, or neither a class or a function :return: full path string """ if obj is not None: if inspect.isclass(obj): return "{}.{}".format(obj.__module__, obj.__name__) if inspect.isfunction(obj): if obj.__name__.startswith("<lambda"): raise TypeError("Can't get full path for lambda functions") return "{}.{}".format(obj.__module__, obj.__name__) if isinstance(obj, object): return "{}.{}".format(obj.__class__.__module__, obj.__class__.__name__) raise TypeError(f"Unable to get type full path from {obj}")
[docs] def to_bool(obj: Any) -> bool: """Convert an object to python bool value. It can handle values like `True`, `true`, `yes`, `1`, etc :param obj: object :raises TypeError: if failed to convert :return: bool value """ if obj is None: raise TypeError("None can't convert to bool") o = str(obj).lower() if o in ["true", "yes", "1"]: return True if o in ["false", "no", "0"]: return False raise TypeError(f"{o} can't convert to bool")
[docs] def to_datetime(obj: Any) -> datetime.datetime: """Convert an object to python datetime. If the object is a string, then if ciso8601 is installed then it will use ``ciso8601.parse_datetime`` to parse else it will use ``pandas.to_datetime`` to parse, which can be a lot slower. :param obj: object :raises TypeError: if failed to convert :return: datetime value """ if obj is None: raise TypeError("None can't convert to datetime") if isinstance(obj, datetime.datetime): return obj if isinstance(obj, datetime.date): return datetime.datetime(obj.year, obj.month, obj.day) if isinstance(obj, str): try: return ( parse_datetime(obj) if _HAS_CISO8601 else pd.to_datetime(obj).to_pydatetime() ) except Exception as e: raise TypeError(f"{obj} can't convert to datetime", e) raise TypeError(f"{type(obj)} {obj} can't convert to datetime")
[docs] def to_timedelta(obj: Any) -> datetime.timedelta: """Convert an object to python datetime. If the object is a string, `min` or `-inf` will return `timedelta.min`, `max` or `inf` will return `timedelta.max`; if the object is a number, the number will be used as the seconds argument; Otherwise it will use `pandas.to_timedelta` to parse the object. :param obj: object :raises TypeError: if failed to convert :return: timedelta value """ if obj is None: raise TypeError("None can't convert to timedelta") if isinstance(obj, datetime.timedelta): return obj if np.isreal(obj): return datetime.timedelta(seconds=float(obj)) try: return pd.to_timedelta(obj).to_pytimedelta() except Exception as e: if isinstance(obj, str): obj = obj.lower() if obj in ["min", "-inf"]: return datetime.timedelta.min elif obj in ["max", "inf"]: return datetime.timedelta.max raise TypeError(f"{type(obj)} {obj} can't convert to timedelta", e)
[docs] def as_type(obj: Any, target: type) -> Any: """Convert `obj` into `target` type :param obj: input object :param target: target type :return: object in the target type """ if issubclass(type(obj), target): return obj if target == bool: return to_bool(obj) if target == datetime.datetime: return to_datetime(obj) if target == datetime.timedelta: return to_timedelta(obj) return target(obj)
[docs] def to_size(exp: Any) -> int: """Convert input value or expression to size For expression string, it must be in the format of `<value>` or `<value><unit>`. Value must be 0 or positive, default unit is byte if not provided. Unit can be `b`, `byte`, `k`, `kb`, `m`, `mb`, `g`, `gb`, `t`, `tb`. Args: exp (Any): expression string or numerical value Raises: ValueError: for invalid expression ValueError: for negative values Returns: int: size in byte """ n, u = _parse_value_and_unit(exp) assert n >= 0.0, "Size can't be negative" if u in ["", "b", "byte", "bytes"]: return int(n) if u in ["k", "kb"]: return int(n * 1024) if u in ["m", "mb"]: return int(n * 1024 * 1024) if u in ["g", "gb"]: return int(n * 1024 * 1024 * 1024) if u in ["t", "tb"]: return int(n * 1024 * 1024 * 1024 * 1024) raise ValueError(f"Invalid size expression {exp}")
def _parse_value_and_unit(exp: Any) -> Tuple[float, str]: try: assert exp is not None if isinstance(exp, (int, float)): return float(exp), "" exp = str(exp).replace(" ", "").lower() i = 1 if exp.startswith("-") else 0 while i < len(exp): if (exp[i] < "0" or exp[i] > "9") and exp[i] != ".": break i += 1 return float(exp[:i]), exp[i:] except (ValueError, AssertionError): raise ValueError(f"Invalid expression {exp}")