triad.utils
triad.utils.assertion
- triad.utils.assertion.assert_arg_not_none(obj, arg_name='', msg='')[source]
Assert an argument is not None, otherwise raise exception
- Parameters:
obj (
typing.Any
) – argument valuearg_name (
str
) – argument name, if None or empty, it will use msgmsg (
str
) – only when arg_name is None or empty, this value is used
- Raises:
NoneArgumentError – with arg_name or msg
- Return type:
- triad.utils.assertion.assert_or_throw(bool_exp, exception=None)[source]
Assert on expression and throw custom exception
- Parameters:
bool_exp (
bool
) – boolean expression to assert onexception (
typing.Optional
[typing.Any
]) – a custom Exception instance, or any other object that will be stringfied and instantiate an AssertionError, or a function that can generate the supported data types
- Return type:
Examples
assert_or_throw(True, "assertion error") assert_or_throw(False) # raise AssertionError assert_or_throw(False, "assertion error") # raise AssertionError assert_or_throw(False, TypeError("assertion error")) # raise TypeError # Lazy evaluations is useful when constructing the error # itself is expensive or error-prone. With lazy evaluations, happy # path will be fast and error free. def fail(): # a function that is slow and wrong sleep(10) raise TypeError assert_or_throw(True, fail()) # (unexpectedly) raise TypeError assert_or_throw(True, fail) # no exception assert_or_throw(True, lambda: "a" + fail()) # no exception assert_or_throw(False, lambda: "a" + fail()) # raise TypeError
triad.utils.batch_reslicers
- class triad.utils.batch_reslicers.ArrowTableBatchReslicer(row_limit=None, size_limit=None)[source]
Bases:
BatchReslicer
[Table
]- concat(batches)[source]
Concatenate a list of batches into one batch
- Parameters:
batches (
typing.List
[pyarrow.lib.Table
]) – the list of batches- Return type:
- Returns:
the concatenated batch
- get_rows_and_size(batch)[source]
Get the number of rows and byte size of a batch
- Parameters:
batch (
pyarrow.lib.Table
) – the batch object- Return type:
- Returns:
the number of rows and byte size of the batch
- take(batch, start, length)[source]
Take a slice of the batch
- Parameters:
batch (
pyarrow.lib.Table
) – the batch objectstart (
int
) – the start row indexlength (
int
) – the number of rows to take
- Return type:
- Returns:
a slice of the batch
- class triad.utils.batch_reslicers.ArrowTableSortedBatchReslicer(keys)[source]
Bases:
SortedBatchReslicer
[Table
]- concat(batches)[source]
Concatenate a list of batches into one batch
- Parameters:
batches (
typing.List
[pyarrow.lib.Table
]) – the list of batches- Return type:
- Returns:
the concatenated batch
- get_batch_length(batch)[source]
Get the number of rows in the batch
- Parameters:
batch (
pyarrow.lib.Table
) – the batch object- Return type:
- Returns:
the number of rows in the batch
- get_keys_ndarray(batch, keys)[source]
Get the keys as a numpy array
- Parameters:
batch (
pyarrow.lib.Table
) – the batch objectkeys (
typing.List
[str
]) – the keys to get
- Return type:
numpy.ndarray
- Returns:
the keys as a numpy array
- take(batch, start, length)[source]
Take a slice of the batch
- Parameters:
batch (
pyarrow.lib.Table
) – the batch objectstart (
int
) – the start row indexlength (
int
) – the number of rows to take
- Return type:
- Returns:
a slice of the batch
- class triad.utils.batch_reslicers.BatchReslicer(row_limit=None, size_limit=None)[source]
Bases:
Generic
[T
]Reslice batch streams with row or/and size limit
- Parameters:
row_limit (
typing.Optional
[int
]) – max row for each slice, defaults to Nonesize_limit (
typing.Optional
[typing.Any
]) – max byte size for each slice, defaults to None
- Raises:
AssertionError – if size_limit is not None but sizer is None
- concat(batches)[source]
Concatenate a list of batches into one batch
- Parameters:
batches (
typing.List
[typing.TypeVar
(T
)]) – the list of batches- Return type:
- Returns:
the concatenated batch
- get_rows_and_size(batch)[source]
Get the number of rows and byte size of a batch
- Parameters:
batch (
typing.TypeVar
(T
)) – the batch object- Return type:
- Returns:
the number of rows and byte size of the batch
- reslice(batches)[source]
Reslice the batch stream into new batches constrained by the row or size limit
- Parameters:
batches (
typing.Iterable
[typing.TypeVar
(T
)]) – the batch stream- Yield:
an iterable of batches of the same type with the constraints
- Return type:
- take(batch, start, length)[source]
Take a slice of the batch
- Parameters:
batch (
typing.TypeVar
(T
)) – the batch objectstart (
int
) – the start row indexlength (
int
) – the number of rows to take
- Return type:
- Returns:
a slice of the batch
- class triad.utils.batch_reslicers.NumpyArrayBatchReslicer(row_limit=None, size_limit=None)[source]
Bases:
BatchReslicer
[ndarray
]- concat(batches)[source]
Concatenate a list of batches into one batch
- Parameters:
batches (
typing.List
[numpy.ndarray
]) – the list of batches- Return type:
numpy.ndarray
- Returns:
the concatenated batch
- class triad.utils.batch_reslicers.PandasBatchReslicer(row_limit=None, size_limit=None)[source]
Bases:
BatchReslicer
[DataFrame
]- concat(batches)[source]
Concatenate a list of batches into one batch
- Parameters:
batches (
typing.List
[pandas.core.frame.DataFrame
]) – the list of batches- Return type:
pandas.core.frame.DataFrame
- Returns:
the concatenated batch
- class triad.utils.batch_reslicers.PandasSortedBatchReslicer(keys)[source]
Bases:
SortedBatchReslicer
[DataFrame
]- concat(batches)[source]
Concatenate a list of batches into one batch
- Parameters:
batches (
typing.List
[pandas.core.frame.DataFrame
]) – the list of batches- Return type:
pandas.core.frame.DataFrame
- Returns:
the concatenated batch
- get_batch_length(batch)[source]
Get the number of rows in the batch
- Parameters:
batch (
pandas.core.frame.DataFrame
) – the batch object- Return type:
- Returns:
the number of rows in the batch
- get_keys_ndarray(batch, keys)[source]
Get the keys as a numpy array
- Parameters:
batch (
pandas.core.frame.DataFrame
) – the batch objectkeys (
typing.List
[str
]) – the keys to get
- Return type:
numpy.ndarray
- Returns:
the keys as a numpy array
- class triad.utils.batch_reslicers.SortedBatchReslicer(keys)[source]
Bases:
Generic
[T
]Reslice batch streams (that are alredy sorted by keys) by keys.
- Parameters:
keys (
typing.List
[str
]) – group keys to reslice by
- concat(batches)[source]
Concatenate a list of batches into one batch
- Parameters:
batches (
typing.List
[typing.TypeVar
(T
)]) – the list of batches- Return type:
- Returns:
the concatenated batch
- get_batch_length(batch)[source]
Get the number of rows in the batch
- Parameters:
batch (
typing.TypeVar
(T
)) – the batch object- Return type:
- Returns:
the number of rows in the batch
- get_keys_ndarray(batch, keys)[source]
Get the keys as a numpy array
- Parameters:
batch (
typing.TypeVar
(T
)) – the batch objectkeys (
typing.List
[str
]) – the keys to get
- Return type:
numpy.ndarray
- Returns:
the keys as a numpy array
- reslice(batches)[source]
Reslice the batch stream into a stream of iterable of batches of the same keys
- Parameters:
batches (
typing.Iterable
[typing.TypeVar
(T
)]) – the batch stream- Yield:
an iterable of iterable of batches containing same keys
- Return type:
- reslice_and_merge(batches)[source]
Reslice the batch stream into new batches, each containing the same keys
- Parameters:
batches (
typing.Iterable
[typing.TypeVar
(T
)]) – the batch stream- Yield:
an iterable of batches, each containing the same keys
- Return type:
- take(batch, start, length)[source]
Take a slice of the batch
- Parameters:
batch (
typing.TypeVar
(T
)) – the batch objectstart (
int
) – the start row indexlength (
int
) – the number of rows to take
- Return type:
- Returns:
a slice of the batch
triad.utils.class_extension
- triad.utils.class_extension.extensible_class(class_type)[source]
The decorator making classes extensible by external methods
- Parameters:
class_type (
typing.Type
) – the class under the decorator- Return type:
- Returns:
the
class_type
Examples
@extensible_class class A: # It's recommended to implement __getattr__ so that # PyLint will not complain about the dynamically added methods def __getattr__(self, name): raise NotImplementedError @extension_method def method(obj:A): return 1 assert 1 == A().method()
Note
If the method name is already in the original class, a ValueError will be thrown. You can’t modify any built-in attribute.
- triad.utils.class_extension.extension_method(func=None, class_type=None, name=None, on_dup='error')[source]
The decorator to add functions as members of the correspondent classes.
- Parameters:
func (
typing.Optional
[typing.Callable
]) – the function under the decoratorclass_type (
typing.Optional
[typing.Type
]) – the parent class type, defaults to Nonename (
typing.Optional
[str
]) – the specified class method name, defaults to None. If None thenfunc.__name__
will be used as the method nameon_dup (
str
) – action on name duplication, defaults toerror
.error
will throw a ValueError;ignore
will take no action;overwrite
will use the current method to overwrite.
- Return type:
- Returns:
the underlying function
Examples
@extensible_class class A: # It's recommended to implement __getattr__ so that # PyLint will not complain about the dynamically added methods def __getattr__(self, name): raise NotImplementedError # The simplest way to use this decorator, the first argument of # the method must be annotated, and the annotated type is the # class type to add this method to. @extension_method def method1(obj:A): return 1 assert 1 == A().method1() # Or you can be explicit of the class type and the name of the # method in the class. In this case, you don't have to annotate # the first argument. @extension_method(class_type=A, name="m3") def method2(obj, b): return 2 + b assert 5 == A().m3(3)
Note
If the method name is already in the original class, a ValueError will be thrown. You can’t modify any built-in attribute.
triad.utils.convert
- triad.utils.convert.as_type(obj, target)[source]
Convert obj into target type
- Parameters:
obj (
typing.Any
) – input objecttarget (
type
) – target type
- Return type:
- Returns:
object in the target type
- triad.utils.convert.compare_annotations(a, b, compare_origin=True)[source]
Compare two type annotations
- Parameters:
a (
typing.Any
) – first type annotationb (
typing.Any
) – second type annotationcompare_origin (
bool
) – whether to compare the origin of the type annotation
- Return type:
- Returns:
whether the two type annotations are equal
- triad.utils.convert.get_caller_global_local_vars(global_vars=None, local_vars=None, start=-1, end=-1)[source]
Get the caller level global and local variables.
- Parameters:
global_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding global variables, if not None, will return this instead of the caller’s globals(), defaults to Nonelocal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding local variables, if not None, will return this instead of the caller’s locals(), defaults to Nonestart (
int
) – start stack level (from 0 to any negative number), defaults to -1 which is one level above where this function is invokedend (
int
) – end stack level (fromstart
to any smaller negative number), defaults to -1 which is one level above where this function is invoked
- Return type:
typing.Tuple
[typing.Dict
[str
,typing.Any
],typing.Dict
[str
,typing.Any
]]- Returns:
tuple of global_vars and local_vars
Examples
def caller(): x=1 assert 1 == get_value("x") def get_value(var_name): _, l = get_caller_global_local_vars() assert var_name in l assert var_name not in locals() return l[var_name]
- Notice:
This is for internal use, users normally should not call this directly.
If merging multiple levels, the variables on closer level (to where it is invoked) will overwrite the further levels values if there is overlap.
Examples
def f1(): x=1 def f2(): x=2 def f3(): _, l = get_caller_global_local_vars(start=-1,end=-2) assert 2 == l["x"] _, l = get_caller_global_local_vars(start=-2,end=-2) assert 1 == l["x"] f2() f1()
- triad.utils.convert.get_full_type_path(obj)[source]
Get the full module path of the type (if obj is class or function) or type of the instance (if obj is an object instance)
- Parameters:
obj (
typing.Any
) – a class/function type or an object instance- Raises:
TypeError – if obj is None, lambda, or neither a class or a function
- Return type:
- Returns:
full path string
- triad.utils.convert.str_to_instance(s, expected_base_type=None, args=[], kwargs={}, global_vars=None, local_vars=None)[source]
Use
str_to_type()
to find a matching type and instantiate- Parameters:
s (
str
) – seestr_to_type()
expected_base_type (
typing.Optional
[type
]) – seestr_to_type()
args (
typing.List
[typing.Any
]) – args to instantiate the typekwargs (
typing.Dict
[str
,typing.Any
]) – kwargs to instantiate the typeglobal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding global variables, if None, it will use the caller’s globals(), defaults to Nonelocal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding local variables, if None, it will use the caller’s locals(), defaults to None
- Return type:
- Returns:
the instantiated the object
- triad.utils.convert.str_to_object(expr, global_vars=None, local_vars=None)[source]
Convert string expression to object. The string expression must express a type with relative or full path, or express a local or global instance without brackets or operators.
- Parameters:
expr (
str
) – string expression, see examples belowglobal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding global variables, if None, it will use the caller’s globals(), defaults to Nonelocal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding local variables, if None, it will use the caller’s locals(), defaults to None
- Return type:
- Returns:
the object
- Raises:
ValueError – unable to find a matching object
Examples
class _Mock: def __init__(self, x=1): self.x = x m = _Mock() assert 1 == str_to_object("m.x") assert 1 == str_to_object("m2.x", local_vars={"m2": m}) assert RuntimeError == str_to_object("RuntimeError") assert _Mock == str_to_object("_Mock")
Note
This function is to dynamically load an object from string expression. If you write that string expression as python code at the same location, it should generate the same result.
- triad.utils.convert.str_to_type(s, expected_base_type=None, global_vars=None, local_vars=None)[source]
Given a string expression, find the first/last type from all import libraries. If the expression contains ., it’s supposed to be a relative or full path of the type including modules.
- Parameters:
s (
str
) – type expression, for example triad.utils.iter.Slicer or strexpected_base_type (
typing.Optional
[type
]) – base class type that must satisfy, defaults to Noneglobal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding global variables, if None, it will use the caller’s globals(), defaults to Nonelocal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding local variables, if None, it will use the caller’s locals(), defaults to None
- Raises:
TypeError – unable to find a matching type
- Return type:
- Returns:
found type
- triad.utils.convert.to_bool(obj)[source]
Convert an object to python bool value. It can handle values like True, true, yes, 1, etc
- Parameters:
obj (
typing.Any
) – object- Raises:
TypeError – if failed to convert
- Return type:
- Returns:
bool value
- triad.utils.convert.to_datetime(obj)[source]
Convert an object to python datetime. If the object is a string, then if ciso8601 is installed then it will use
ciso8601.parse_datetime
to parse else it will usepandas.to_datetime
to parse, which can be a lot slower.- Parameters:
obj (
typing.Any
) – object- Raises:
TypeError – if failed to convert
- Return type:
- Returns:
datetime value
- triad.utils.convert.to_function(func, global_vars=None, local_vars=None)[source]
For an expression, it tries to find the matching function.
- Params s:
a string expression or a callable
- Parameters:
global_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding global variables, if None, it will use the caller’s globals(), defaults to Nonelocal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding local variables, if None, it will use the caller’s locals(), defaults to None
- Raises:
AttributeError – if unable to find such a function
- Return type:
- Returns:
the matching function
- triad.utils.convert.to_instance(s, expected_base_type=None, args=[], kwargs={}, global_vars=None, local_vars=None)[source]
If s is str or type, then use
to_type()
to find matching type and instantiate. Otherwise return s if it matches constraints- Parameters:
s (
typing.Any
) – seeto_type()
expected_base_type (
typing.Optional
[type
]) – seeto_type()
args (
typing.List
[typing.Any
]) – args to instantiate the typekwargs (
typing.Dict
[str
,typing.Any
]) – kwargs to instantiate the typeglobal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding global variables, if None, it will use the caller’s globals(), defaults to Nonelocal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding local variables, if None, it will use the caller’s locals(), defaults to None
- Raises:
ValueError – if s is an instance but not a (sub)type of expected_base_type
TypeError – if s is an instance, args and kwargs must be empty
- Return type:
- Returns:
the instantiated object
- triad.utils.convert.to_size(exp)[source]
Convert input value or expression to size For expression string, it must be in the format of <value> or <value><unit>. Value must be 0 or positive, default unit is byte if not provided. Unit can be b, byte, k, kb, m, mb, g, gb, t, tb.
- Parameters:
exp (
typing.Any
) – expression string or numerical value- Raises:
ValueError – for invalid expression and negative values
- Return type:
- Returns:
size in byte
- triad.utils.convert.to_timedelta(obj)[source]
Convert an object to python datetime.
If the object is a string, min or -inf will return timedelta.min, max or inf will return timedelta.max; if the object is a number, the number will be used as the seconds argument; Otherwise it will use pandas.to_timedelta to parse the object.
- Parameters:
obj (
typing.Any
) – object- Raises:
TypeError – if failed to convert
- Return type:
- Returns:
timedelta value
- triad.utils.convert.to_type(s, expected_base_type=None, global_vars=None, local_vars=None)[source]
Convert an object s to type * if s is str: see
str_to_type()
* if s is type: check expected_base_type and return itself * else: check expected_base_type and return itself- Parameters:
s (
typing.Any
) – seestr_to_type()
expected_base_type (
typing.Optional
[type
]) – seestr_to_type()
global_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding global variables, if None, it will use the caller’s globals(), defaults to Nonelocal_vars (
typing.Optional
[typing.Dict
[str
,typing.Any
]]) – overriding local variables, if None, it will use the caller’s locals(), defaults to None
- Raises:
TypeError – if no matching type found
- Return type:
- Returns:
the matching type
triad.utils.dispatcher
- class triad.utils.dispatcher.ConditionalDispatcher(default_func, is_broadcast, entry_point=None)[source]
Bases:
object
A conditional function dispatcher based on custom matching functions. This is a more general solution compared to
functools.singledispatch
. You can write arbitrary matching functions according to all the inputs of the function.Note
Please use the decorators
conditional_dispatcher()
andconditional_broadcaster()
instead of directly using this class.- Parameters:
default_func (
typing.Callable
[...
,typing.Any
]) – the parent function that will dispatch the execution based on matching functionsentry_point (
typing.Optional
[str
]) – the entry point to preload children functions, defaults to None
- candidate(matcher, priority=1.0)[source]
A decorator to register a child function with matcher and priority.
Note
The order to be matched is determined by both the priority and the order of registration.
The default priority is 1.0
Children with higher priority values will be matched earlier
When
priority>0
then later registrations will be matched earlierWhen
priority<=0
then earlier registrations will be matched earlier
So if you want to ‘overwrite’ the existed matches, set priority to be greater than 1.0. If you want to ‘ignore’ the current if there are other matches, set priority to 0.0.
See also
Please see examples in
conditional_dispatcher()
andconditional_broadcaster()
.- Parameters:
matcher (
typing.Callable
[...
,bool
]) – a function determines whether it is a match based on the same input as the parent functionpriority (
float
) – it determines the order to be matched, higher value means higher priority, defaults to 1.0
- Return type:
- register(func, matcher, priority=1.0)[source]
Register a child function with matcher and priority.
Note
The order to be matched is determined by both the priority and the order of registration.
The default priority is 1.0
Children with higher priority values will be matched earlier
When
priority>0
then later registrations will be matched earlierWhen
priority<=0
then earlier registrations will be matched earlier
So if you want to ‘overwrite’ the existed matches, set priority to be greater than 1.0. If you want to ‘ignore’ the current if there are other matches, set priority to 0.0.
- Parameters:
func (
typing.Callable
[...
,typing.Any
]) – a child function to be used when matchingmatcher (
typing.Callable
[...
,bool
]) – a function determines whether it is a match based on the same input as the parent functionpriority (
float
) – it determines the order to be matched, higher value means higher priority, defaults to 1.0
- Return type:
- run(*args, **kwargs)[source]
Execute all matching children functions as a generator. :rtype:
typing.Iterable
[typing.Any
]Note
Only when there is matching functions, the default implementation will be invoked.
- triad.utils.dispatcher.conditional_broadcaster(default_func=None, entry_point=None)[source]
Decorating a conditional broadcaster that will run all registered functions in other modules/packages.
Examples
Assume in
pkg1.module1
, you have:from triad import conditional_broadcaster @conditional_broadcaster(entry_point="my.plugins") def myprint(obj): raise NotImplementedError @conditional_broadcaster(entry_point="my.plugins") def myprint2(obj): raise NotImplementedError
In another package
pkg2
, insetup.py
, you define an entry point as:setup( ..., entry_points={ "my.plugins": [ "my = pkg2.module2" ] }, )
And in
pkg2.module2
:from pkg1.module1 import get_len @myprint.candidate(lambda obj: isinstance(obj, str)) def myprinta(obj:str) -> None: print(obj, "a") @myprint.candidate(lambda obj: isinstance(obj, str) and obj == "x") def myprintb(obj:str) -> None: print(obj, "b")
Now, both functions will be automatically registered when
pkg2
is installed in the environement. In anotherpkg3
:from pkg1.module1 import get_len myprint("x") # calling both myprinta and myprinta myprint("y") # calling myprinta only myprint2("x") # raise NotImplementedError due to no matching candidates
Note
Only when no matching candidate found, the implementation of the original function will be used. If you don’t want to throw an error, then use
pass
in the original function instead.See also
Please read
candidate()
for details about the matching function and priority settings.- Parameters:
default_func (
typing.Optional
[typing.Callable
[...
,typing.Any
]]) – the function to decorateentry_point (
typing.Optional
[str
]) – the entry point to preload dispatchers, defaults to None
- Return type:
- triad.utils.dispatcher.conditional_dispatcher(default_func=None, entry_point=None)[source]
Decorating a conditional dispatcher that will run the first matching registered functions in other modules/packages. This is a more general solution compared to
functools.singledispatch
. You can write arbitrary matching functions according to all the inputs of the function.Examples
Assume in
pkg1.module1
, you have:from triad import conditional_dispatcher @conditional_dispatcher(entry_point="my.plugins") def get_len(obj): raise NotImplementedError
In another package
pkg2
, insetup.py
, you define an entry point as:setup( ..., entry_points={ "my.plugins": [ "my = pkg2.module2" ] }, )
And in
pkg2.module2
:from pkg1.module1 import get_len @get_len.candidate(lambda obj: isinstance(obj, str)) def get_str_len(obj:str) -> int: return len(obj) @get_len.candidate(lambda obj: isinstance(obj, int) and obj == 10) def get_int_len(obj:int) -> int: return obj
Now, both functions will be automatically registered when
pkg2
is installed in the environement. In anotherpkg3
:from pkg1.module1 import get_len assert get_len("abc") == 3 # calling get_str_len assert get_len(10) == 10 # calling get_int_len get_len(20) # raise NotImplementedError due to no matching candidates
See also
Please read
candidate()
for details about the matching function and priority settings.- Parameters:
default_func (
typing.Optional
[typing.Callable
[...
,typing.Any
]]) – the function to decorateentry_point (
typing.Optional
[str
]) – the entry point to preload dispatchers, defaults to None
- Return type:
- triad.utils.dispatcher.run_at_def(run_at_def_func=None, **kwargs)[source]
Decorator to run the function at declaration. This is useful when we want import to trigger a function run (which can guarantee it runs only once).
Examples
Assume the following python file is a module in your package, then when you
import package.module
, the two functions will run.from triad import run_at_def @run_at_def def register_something(): print("registered") @run_at_def(a=1) def register_something2(a): print("registered", a)
- Parameters:
run_at_def_func (
typing.Optional
[typing.Callable
]) – the function to decoratekwargs (
typing.Any
) – the parameters to call this function
- Return type:
triad.utils.entry_points
triad.utils.hash
- triad.utils.hash.to_uuid(*args)[source]
Determine the uuid by input arguments. It will search the input recursively. If an object contains __uuid__ method, it will call that method to get the uuid for that object.
Examples
to_uuid([1,2,3]) to_uuid(1,2,3) to_uuid(dict(a=1,b="z"))
- Parameters:
args (
typing.Any
) – arbitrary input- Return type:
- Returns:
uuid string
triad.utils.io
- triad.utils.io.chdir(path)[source]
Change the current working directory to the given path
- Parameters:
path (
str
) – the path to change to- Return type:
Examples
from fugue_ml.utils.io import chdir with chdir("/tmp"): # do something
- triad.utils.io.glob(path)[source]
Glob files
- Parameters:
path (
str
) – the path to glob- Return type:
- Returns:
the matched files (absolute paths)
- triad.utils.io.touch(path, auto_mkdir=False)[source]
Create an empty file or update the timestamp of the file
- triad.utils.io.unzip_to_temp(fobj)[source]
Unzip a file object into a temporary directory.
- Parameters:
fobj (
typing.Any
) – the file object- Return type:
Examples
from fugue_ml.utils.io import zip_temp from io import BytesIO bio = BytesIO() with zip_temp(bio) as tmpdir: # create files in the tmpdir (string) with unzip_to_temp(BytesIO(bio.getvalue())) as tmpdir: # read files from the tmpdir (string)
- triad.utils.io.url_to_fs(path, **kwargs)[source]
A wrapper of
fsspec.core.url_to_fs
- Parameters:
path (
str
) – the path to be usedkwargs (
typing.Any
) – additional arguments tofsspec.core.url_to_fs
- Return type:
typing.Tuple
[fsspec.spec.AbstractFileSystem
,str
]- Returns:
the file system and the path
- triad.utils.io.write_bytes(path, contents, create_dir=True)[source]
Write bytes to a file. If the directory of the file does not exist, it will create the directory first
- triad.utils.io.write_text(path, contents)[source]
Write text to a file. If the directory of the file does not exist, it will create the directory first
- triad.utils.io.zip_temp(fobj)[source]
Zip a temporary directory to a file object.
- Parameters:
fobj (
typing.Any
) – the file path or file object- Return type:
Examples
from fugue_ml.utils.io import zip_temp from io import BytesIO bio = BytesIO() with zip_temp(bio) as tmpdir: # do something with tmpdir (string)
triad.utils.iter
- class triad.utils.iter.EmptyAwareIterable(it)[source]
Bases:
Iterable
[T
]A wrapper of iterable that can tell if the underlying iterable is empty, it can also peek a non-empty iterable.
- Parameters:
it (
typing.Union
[typing.Iterable
[typing.TypeVar
(T
)],typing.Iterator
[typing.TypeVar
(T
)]]) – the underlying iterable- Raises:
StopIteration – raised by the underlying iterable
- peek()[source]
Return the next of the iterable without moving
- Raises:
StopIteration – if it’s empty
- Return type:
- Returns:
the next item
- class triad.utils.iter.Slicer(sizer=None, row_limit=None, size_limit=None, slicer=None)[source]
Bases:
object
A better version of
slice_iterable()
- Parameters:
sizer (
typing.Optional
[typing.Callable
[[typing.Any
],int
]]) – the function to get size of an itemrow_limit (
typing.Optional
[int
]) – max row for each slice, defaults to Nonesize_limit (
typing.Optional
[typing.Any
]) – max byte size for each slice, defaults to Noneslicer (
typing.Optional
[typing.Callable
[[int
,typing.TypeVar
(T
),typing.Optional
[typing.TypeVar
(T
)]],bool
]]) – taking in current number, current value, last value, it decides if it’s a new slice
- Raises:
AssertionError – if size_limit is not None but sizer is None
- slice(orig_it)[source]
Slice the original iterable into slices by the combined slicing logic
- Parameters:
orig_it (
typing.Iterable
[typing.TypeVar
(T
)]) – ther original iterable- Yield:
an iterable of EmptyAwareIterable
- Return type:
typing.Iterable
[triad.utils.iter.EmptyAwareIterable
[typing.TypeVar
(T
)]]
- triad.utils.iter.make_empty_aware(it)[source]
Make an iterable empty aware, or return itself if already empty aware
- Parameters:
it (
typing.Union
[typing.Iterable
[typing.TypeVar
(T
)],typing.Iterator
[typing.TypeVar
(T
)]]) – underlying iterable- Return type:
- Returns:
EmptyAwareIterable[T]
- triad.utils.iter.slice_iterable(it, slicer)[source]
Slice the original iterable into slices by slicer
- Parameters:
it (
typing.Union
[typing.Iterable
[typing.TypeVar
(T
)],typing.Iterator
[typing.TypeVar
(T
)]]) – underlying iterableslicer (
typing.Callable
[[int
,typing.TypeVar
(T
),typing.Optional
[typing.TypeVar
(T
)]],bool
]) – taking in current number, current value, last value, it decides if it’s a new slice
- Yield:
an iterable of iterables (_SliceIterable[T])
- Return type:
typing.Iterable
[triad.utils.iter._SliceIterable
[typing.TypeVar
(T
)]]
- triad.utils.iter.to_kv_iterable(data, none_as_empty=True)[source]
Convert data to iterable of key value pairs
- Parameters:
data (
typing.Any
) – input object, it can be a dict or Iterable[Tuple[Any, Any]] or Iterable[List[Any]]none_as_empty (
bool
) – if to treat None as empty iterable
- Raises:
ValueError – if input is None and none_as_empty==False
ValueError – if input is a set
TypeError or ValueError – if input data type is not acceptable
- Yield:
iterable of key value pair as tuples
- Return type:
triad.utils.json
- triad.utils.json.check_for_duplicate_keys(ordered_pairs)[source]
Raise ValueError if a duplicate key exists in provided ordered list of pairs, otherwise return a dict.
Example: :rtype:
typing.Dict
[typing.Any
,typing.Any
]>>> json.loads('{"x": 1, "x": 2}', object_pairs_hook=check_for_duplicate_keys)
- Raises:
KeyError – if there is duplicated key
triad.utils.pandas_like
- class triad.utils.pandas_like.PandasLikeUtils[source]
Bases:
Generic
[T
,ColT
]A collection of utils for general pandas like dataframes
- as_array_iterable(df, schema=None, columns=None, type_safe=False)[source]
Convert pandas like dataframe to iterable of rows in the format of list.
- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – pandas like dataframeschema (
typing.Optional
[pyarrow.lib.Schema
]) – schema of the input. With None, it will infer the schema, it can’t infer wrong schema for nested types, so try to be explicitcolumns (
typing.Optional
[typing.List
[str
]]) – columns to output, None for all columnstype_safe (
bool
) – whether to enforce the types in schema, if False, it will return the original values from the dataframe
- Return type:
- Returns:
iterable of rows, each row is a list
- as_arrow(df, schema=None)[source]
Convert pandas like dataframe to pyarrow table
- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – pandas like dataframeschema (
typing.Optional
[pyarrow.lib.Schema
]) – if specified, it will be used to construct pyarrow table, defaults to None
- Return type:
- Returns:
pyarrow table
- cast_df(df, schema, use_extension_types=True, use_arrow_dtype=False, **kwargs)[source]
Cast pandas like dataframe to comply with
schema
.- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – pandas like dataframeschema (
pyarrow.lib.Schema
) – pyarrow schema to cast touse_extension_types (
bool
) – whether to useExtensionDType
, default Trueuse_arrow_dtype (
bool
) – whether to useArrowDtype
, default Falsekwargs (
typing.Any
) – other arguments passed topa.Table.from_pandas
- Return type:
typing.TypeVar
(T
, bound=typing.Any
)- Returns:
converted dataframe
- concat_dfs(*dfs)[source]
Concatenate dataframes
- Parameters:
dfs (
typing.TypeVar
(T
, bound=typing.Any
)) – the dataframes to concatenate- Return type:
typing.TypeVar
(T
, bound=typing.Any
)- Returns:
the concatenated dataframe
- drop_duplicates(df)[source]
Remove duplicated rows
- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – the dataframe- Return type:
typing.TypeVar
(T
, bound=typing.Any
)- Returns:
the dataframe without duplicated rows
- empty(df)[source]
Check if the dataframe is empty
- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – pandas like dataframe- Return type:
- Returns:
if it is empty
- ensure_compatible(df)[source]
Check whether the datafame is compatible with the operations inside this utils collection, if not, it will raise ValueError
- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – pandas like dataframe- Raises:
ValueError – if not compatible
- Return type:
- except_df(df1, df2, unique, anti_indicator_col='__anti_indicator__')[source]
Remove df2 from df1
- Parameters:
df1 (
typing.TypeVar
(T
, bound=typing.Any
)) – dataframe 1df2 (
typing.TypeVar
(T
, bound=typing.Any
)) – dataframe 2unique (
bool
) – whether to remove duplicated rows in the result
- Return type:
typing.TypeVar
(T
, bound=typing.Any
)- Returns:
the dataframe with df2 removed
- fillna_default(col)[source]
Fill column with default values according to the dtype of the column.
- Parameters:
col (
typing.Any
) – series of a pandas like dataframe- Return type:
- Returns:
filled series
- intersect(df1, df2, unique)[source]
Intersect two dataframes
- Parameters:
ndf1 – dataframe 1
ndf2 – dataframe 2
unique (
bool
) – whether to remove duplicated rows
- Return type:
typing.TypeVar
(T
, bound=typing.Any
)- Returns:
the intersected dataframe
- is_compatile_index(df)[source]
Check whether the datafame is compatible with the operations inside this utils collection
- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – pandas like dataframe- Return type:
- Returns:
if it is compatible
- join(ndf1, ndf2, join_type, on, anti_indicator_col='__anti_indicator__', cross_indicator_col='__corss_indicator__')[source]
Join two dataframes
- Parameters:
ndf1 (
typing.TypeVar
(T
, bound=typing.Any
)) – dataframe 1ndf2 (
typing.TypeVar
(T
, bound=typing.Any
)) – dataframe 2join_type (
str
) – join type, can be inner, left_semi, left_anti, left_outer, right_outer, full_outer, crosson (
typing.List
[str
]) – join keys
- Return type:
typing.TypeVar
(T
, bound=typing.Any
)- Returns:
the joined dataframe
- safe_groupby_apply(df, cols, func, key_col_name='__safe_groupby_key__', **kwargs)[source]
Safe groupby apply operation on pandas like dataframes. In pandas like groupby apply, if any key is null, the whole group is dropped. This method makes sure those groups are included.
- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – pandas like dataframecols (
typing.List
[str
]) – columns to group on, can be emptyfunc (
typing.Callable
[[typing.TypeVar
(T
, bound=typing.Any
)],typing.TypeVar
(T
, bound=typing.Any
)]) – apply function, df in, df outkey_col_name – temp key as index for groupu. default “__safe_groupby_key__”
- Return type:
typing.TypeVar
(T
, bound=typing.Any
)- Returns:
output dataframe
- Notice:
The dataframe must be either empty, or with type pd.RangeIndex, pd.Int64Index or pd.UInt64Index and without a name, otherwise, ValueError will raise.
- to_parquet_friendly(df, partition_cols=None)[source]
Parquet doesn’t like pd.ArrowDtype(<nested types>), this function converts all nested types to object types
- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – the input dataframepartition_cols (
typing.Optional
[typing.List
[str
]]) – the partition columns, if any, default None
- Return type:
typing.TypeVar
(T
, bound=typing.Any
)- Returns:
the converted dataframe
- to_schema(df)[source]
Extract pandas dataframe schema as pyarrow schema. This is a replacement of pyarrow.Schema.from_pandas, and it can correctly handle string type and empty dataframes
- Parameters:
df (
typing.TypeVar
(T
, bound=typing.Any
)) – pandas dataframe- Raises:
ValueError – if pandas dataframe does not have named schema
- Return type:
- Returns:
pyarrow.Schema
- Notice:
The dataframe must be either empty, or with type pd.RangeIndex, pd.Int64Index or pd.UInt64Index and without a name, otherwise, ValueError will raise.
- union(ndf1, ndf2, unique)[source]
Union two dataframes
- Parameters:
ndf1 (
typing.TypeVar
(T
, bound=typing.Any
)) – dataframe 1ndf2 (
typing.TypeVar
(T
, bound=typing.Any
)) – dataframe 2unique (
bool
) – whether to remove duplicated rows
- Return type:
typing.TypeVar
(T
, bound=typing.Any
)- Returns:
the unioned dataframe
- class triad.utils.pandas_like.PandasUtils[source]
Bases:
PandasLikeUtils
[DataFrame
,Series
]A collection of pandas utils
triad.utils.pyarrow
- class triad.utils.pyarrow.SchemaedDataPartitioner(schema, key_positions, sizer=None, row_limit=0, size_limit=None)[source]
Bases:
object
Partitioner for stream of array like data with given schema. It uses :func”~triad.utils.iter.Slicer to partition the stream
- Parameters:
schema (
pyarrow.lib.Schema
) – the schema of the data stream to processkey_positions (
typing.List
[int
]) – positions of partition keys on schemasizer (
typing.Optional
[typing.Callable
[[typing.Any
],int
]]) – the function to get size of an itemrow_limit (
int
) – max row for each slice, defaults to Nonesize_limit (
typing.Optional
[typing.Any
]) – max byte size for each slice, defaults to None
- partition(data)[source]
Partition the given data stream
- Parameters:
data (
typing.Iterable
[typing.Any
]) – iterable of array like objects- Yield:
iterable of <partition_no, slice_no, slice iterable> tuple
- Return type:
typing.Iterable
[typing.Tuple
[int
,int
,triad.utils.iter.EmptyAwareIterable
[typing.Any
]]]
- triad.utils.pyarrow.apply_schema(schema, data, copy=True, deep=False, str_as_json=True)[source]
Use pa.Schema to convert a row(list) to the correspondent types.
Notice this function is to convert from python native type to python native type. It is used to normalize data input, which could be generated by different logics, into the correct data types.
Notice this function assumes each item of data has the same length with schema and will not do any extra validation on that.
- Parameters:
schema (
pyarrow.lib.Schema
) – pyarrow schemadata (
typing.Iterable
[typing.List
[typing.Any
]]) – and iterable of rows, represtented by list or tuplecopy (
bool
) – whether to apply inplace (copy=False), or create new instancesdeep (
bool
) – whether to do deep conversion on nested (struct, list) typesstr_as_json (
bool
) – where to treat string data as json for nested types
- Raises:
ValueError – if any value can’t be converted to the datatype
NotImplementedError – if any field type is not supported by Triad
- Yield:
converted rows
- Return type:
- triad.utils.pyarrow.cast_pa_table(df, schema)[source]
Convert a pyarrow table to another pyarrow table with given schema
- Parameters:
df (
pyarrow.lib.Table
) – the pyarrow tableschema (
pyarrow.lib.Schema
) – the pyarrow schema
- Return type:
- Returns:
the converted pyarrow table
- triad.utils.pyarrow.expression_to_schema(expr)[source]
Convert schema expression to pyarrow.Schema.
Format: col_name:col_type[,col_name:col_type]+
If col_type is a list type, the syntax should be [element_type]
If col_type is a struct type, the syntax should be {col_name:col_type[,col_name:col_type]+}
If col_type is a map type, the syntax should be <key_type,value_type>
Whitespaces will be removed. The format of the expression is json without any double quotes
Examples
expression_to_schema("a:int,b:int") expression_to_schema("a:[int],b:{x:<int,int>,y:{z:[str],w:byte}}")
- Parameters:
expr (
str
) – schema expression- Raises:
SyntaxError – if there is syntax issue or unknown types
- Return type:
- Returns:
pyarrow.Schema
- triad.utils.pyarrow.get_alter_func(from_schema, to_schema, safe)[source]
Generate the alteration function based on
from_schema
andto_schema
. This function can be applied to arrow tables withfrom_schema
, the outout will be into_schema
’s order and types- Parameters:
from_schema (
pyarrow.lib.Schema
) – the source schemato_schema (
pyarrow.lib.Schema
) – the destination schemasafe (
bool
) – whether to check for conversion errors such as overflow
- Return type:
- Returns:
a function that can be applied to arrow tables with
from_schema
, the outout will be into_schema
’s order and types
- triad.utils.pyarrow.get_eq_func(data_type)[source]
Generate equality function for a give datatype
- Parameters:
data_type (
pyarrow.lib.DataType
) – pyarrow data type supported by Triad- Return type:
- Returns:
the function
- triad.utils.pyarrow.is_supported(data_type, throw=False)[source]
Whether data_type is currently supported by Triad
- Parameters:
data_type (
pyarrow.lib.DataType
) – instance of pa.DataTypethrow (
bool
) – whether to raise exception if not supported
- Return type:
- Returns:
if it is supported
- triad.utils.pyarrow.pa_batch_to_dicts(batch)[source]
Convert a pyarrow record batch to list of dict
- Parameters:
batch (
pyarrow.lib.RecordBatch
) – the pyarrow record batch- Return type:
- Returns:
the list of dict
- triad.utils.pyarrow.pa_batch_to_pandas(batch, use_extension_types=False, use_arrow_dtype=False, **kwargs)[source]
Convert a pyarrow record batch to pandas dataframe
- Parameters:
batch (
pyarrow.lib.RecordBatch
) – the pyarrow record batchuse_extension_types (
bool
) – whether to use pandas extension data types, default to Falseuse_arrow_dtype (
bool
) – if True and when pandas supportsArrowDType
, use pyarrow types, default Falsekwargs (
typing.Any
) – other arguments forpa.Table.to_pandas
- Return type:
pandas.core.frame.DataFrame
- Returns:
the pandas dataframe
- triad.utils.pyarrow.pa_datatypes_equal(t1, t2, ignore_list_item_name=True, equal_groups=None)[source]
Check if two pyarrow data types are equal
- Parameters:
t1 (
pyarrow.lib.DataType
) – the first pyarrow data typet2 (
pyarrow.lib.DataType
) – the second pyarrow data typeignore_list_item_name (
bool
) – whether to ignore list item name, defaults to Trueequal_groups (
typing.Optional
[typing.List
[typing.List
[typing.Callable
[[pyarrow.lib.DataType
],bool
]]]]) – a list of groups of functions to check equality, defaults to None
- Return type:
- Returns:
if the two data types are equal
Note
In the lastest version of pyarrow, in the default comparison logic, list field names are not compared.
Examples
assert not pa_datatypes_equal(pa.int32(), pa.int64()) assert pa_datatypes_equal( pa.int32(), pa.int64(), equal_groups=[[pa.types.is_integer]], )
- triad.utils.pyarrow.pa_schemas_equal(s1, s2, ignore_list_item_name=True, equal_groups=None)[source]
Check if two pyarrow schemas are equal
- Parameters:
s1 (
pyarrow.lib.Schema
) – the first pyarrow schemas2 (
pyarrow.lib.Schema
) – the second pyarrow schemaignore_list_item_name (
bool
) – whether to ignore list item name, defaults to Trueequal_groups (
typing.Optional
[typing.List
[typing.List
[typing.Callable
[[pyarrow.lib.DataType
],bool
]]]]) – a list of groups of functions to check equality, defaults to None
- Return type:
- Returns:
if the two schemas are equal
Note
In the lastest version of pyarrow, in the default comparison logic, list field names are not compared.
Examples
s1 = pa.schema([("a", pa.int32()), ("b", pa.string())]) s2 = pa.schema([("a", pa.int64()), ("b", pa.string())]) assert not pa_schemas_equal(s1, s2) assert pa_schemas_equal( s1, s2, equal_groups=[[pa.types.is_integer]], )
- triad.utils.pyarrow.pa_table_to_pandas(df, use_extension_types=False, use_arrow_dtype=False, **kwargs)[source]
Convert a pyarrow table to pandas dataframe
- Parameters:
df (
pyarrow.lib.Table
) – the pyarrow tableuse_extension_types (
bool
) – whether to use pandas extension data types, default to Falseuse_arrow_dtype (
bool
) – if True and when pandas supportsArrowDType
, use pyarrow types, default Falsekwargs (
typing.Any
) – other arguments forpa.Table.to_pandas
- Return type:
pandas.core.frame.DataFrame
- Returns:
the pandas dataframe
- triad.utils.pyarrow.parse_json_columns(df, columns)[source]
Parse json string columns in a table and replace them with pyarrow types.
- Parameters:
df (
pyarrow.lib.Table
) – the tablecolumns (
typing.Union
[typing.List
[str
],pyarrow.lib.Schema
]) – the columns to convert, can be a list of column names or a schema. If it is a list of names, then their types will be inferred from the data.
- Return type:
- Returns:
the new table
- triad.utils.pyarrow.replace_type(current_type, is_type, convert_type, recursive=True)[source]
Replace
current_type
or if it is nested, replace in the nested types- Parameters:
current_type (
pyarrow.lib.DataType
) – the current typeis_type (
typing.Callable
[[pyarrow.lib.DataType
],bool
]) – the function to check if the type is the type to replaceconvert_type (
typing.Callable
[[pyarrow.lib.DataType
],pyarrow.lib.DataType
]) – the function to convert the typerecursive (
bool
) – whether to do recursive replacement in nested types
- Return type:
- Returns:
the new type
- triad.utils.pyarrow.replace_types_in_schema(schema, pairs, recursive=True)[source]
Replace types in a schema
- Parameters:
schema (
pyarrow.lib.Schema
) – the schemapairs (
typing.List
[typing.Tuple
[typing.Union
[typing.Callable
[[pyarrow.lib.DataType
],bool
],pyarrow.lib.DataType
],typing.Union
[typing.Callable
[[pyarrow.lib.DataType
],pyarrow.lib.DataType
],pyarrow.lib.DataType
]]]) – a list of (is_type, convert_type) pairsrecursive (
bool
) – whether to do recursive replacement in nested types
- Return type:
- Returns:
the new schema
- triad.utils.pyarrow.replace_types_in_table(df, pairs, recursive=True, safe=True)[source]
Replace(cast) types in a table
- Parameters:
df (
pyarrow.lib.Table
) – the tablepairs (
typing.List
[typing.Tuple
[typing.Union
[typing.Callable
[[pyarrow.lib.DataType
],bool
],pyarrow.lib.DataType
],typing.Union
[typing.Callable
[[pyarrow.lib.DataType
],pyarrow.lib.DataType
],pyarrow.lib.DataType
]]]) – a list of (is_type, convert_type) pairsrecursive (
bool
) – whether to do recursive replacement in nested typessafe (
bool
) – whether to check for conversion errors such as overflow
- Return type:
- Returns:
the new table
- triad.utils.pyarrow.schema_to_expression(schema)[source]
Convert pyarrow.Schema to Triad schema expression see
expression_to_schema()
- Parameters:
schema (
pyarrow.lib.Schema
) – pyarrow schema- Raises:
NotImplementedError – if there some type is not supported by Triad
- Return type:
- Returns:
schema string expression
- triad.utils.pyarrow.schemas_equal(a, b, check_order=True, check_metadata=True, ignore=None)[source]
check if two schemas are equal
- Parameters:
a (
pyarrow.lib.Schema
) – first pyarrow schemab (
pyarrow.lib.Schema
) – second pyarrow schemacompare_order – whether to compare order
compare_order – whether to compare metadata
ignore (
typing.Optional
[typing.List
[typing.Tuple
[typing.Union
[typing.Callable
[[pyarrow.lib.DataType
],bool
],pyarrow.lib.DataType
],typing.Union
[typing.Callable
[[pyarrow.lib.DataType
],pyarrow.lib.DataType
],pyarrow.lib.DataType
]]]]) – a list of (is_type, convert_type) pairs to ignore differences on, defaults to None
- Return type:
- Returns:
if the two schema equal
- triad.utils.pyarrow.to_pa_datatype(obj)[source]
Convert an object to pyarrow DataType
- Parameters:
obj (
typing.Any
) – any object- Raises:
TypeError – if unable to convert
- Return type:
- Returns:
an instance of pd.DataType
- triad.utils.pyarrow.to_pandas_dtype(schema, use_extension_types=False, use_arrow_dtype=False)[source]
convert as dtype dict for pandas dataframes. Currently, struct type is not supported
- Parameters:
schema (
pyarrow.lib.Schema
) – the pyarrow schemause_extension_types (
bool
) – whether to use pandas extension data types, default to Falseuse_arrow_dtype (
bool
) – if True and when pandas supportsArrowDType
, use pyarrow types, default False
- Return type:
typing.Dict
[str
,numpy.dtype
]- Returns:
the pandas data type dictionary
Note
- If
use_extension_types
is False anduse_arrow_dtype
is True, it converts all types to
ArrowDType
- If
- If both are true, it converts types to the numpy backend nullable
dtypes if possible, otherwise, it converts to
ArrowDType
- triad.utils.pyarrow.to_pandas_types_mapper(pa_type, use_extension_types=False, use_arrow_dtype=False)[source]
The types_mapper for
pa.Table.to_pandas
- Parameters:
pa_type (
pyarrow.lib.DataType
) – the pyarrow data typeuse_extension_types (
bool
) – whether to use pandas extension data types, default to Falseuse_arrow_dtype (
bool
) – if True and when pandas supportsArrowDType
, use pyarrow types, default False
- Return type:
typing.Optional
[pandas.core.dtypes.base.ExtensionDtype
]- Returns:
the pandas ExtensionDtype if available, otherwise None
Note
- If
use_extension_types
is False anduse_arrow_dtype
is True, it converts the type to
ArrowDType
- If
- If both are true, it converts the type to the numpy backend nullable
dtypes if possible, otherwise, it converts to
ArrowDType
- triad.utils.pyarrow.to_single_pandas_dtype(pa_type, use_extension_types=False, use_arrow_dtype=False)[source]
convert a pyarrow data type to a pandas datatype. Currently, struct type is not supported
- Parameters:
pa_type (
pyarrow.lib.DataType
) – the pyarrow data typeuse_extension_types (
bool
) – whether to use pandas extension data types, default to Falseuse_arrow_dtype (
bool
) – if True and when pandas supportsArrowDType
, use pyarrow types, default False
- Return type:
numpy.dtype
- Returns:
the pandas data type
Note
- If
use_extension_types
is False anduse_arrow_dtype
is True, it converts the type to
ArrowDType
- If
- If both are true, it converts the type to the numpy backend nullable
dtypes if possible, otherwise, it converts to
ArrowDType
triad.utils.rename
- triad.utils.rename.normalize_names(names)[source]
Normalize dataframe column names to follow Fugue column naming rules. It only operates on names that are not valid to Fugue.
It tries to minimize the changes to the original name. Special characters will be converted to
_
, but if this does not provide a valid and unique column name, more transformation will be done.Note
This is a temporary solution before
Schema
can take arbitrary namesExamples
[0,1]
=>{0:"_0", 1:"_1"}
["1a","2b"]
=>{"1a":"_1a", "2b":"_2b"}
["*a","-a"]
=>{"*a":"_a", "-a":"_a_1"}
- Parameters:
names (
typing.List
[typing.Any
]) – the columns names of a dataframe- Return type:
- Returns:
the rename operations as a dict, key is the original column name, value is the new valid name.
triad.utils.schema
- triad.utils.schema.move_to_unquoted(expr, p, quote='`')[source]
When
p
is on a quote, find the position next to the end of the quoted part- Parameters:
- Raises:
SyntaxError – if there is an open quote detected
- Return type:
- Returns:
the position next to the end of the quoted part
- triad.utils.schema.quote_name(name, quote='`')[source]
Add quote ` for strings that are not a valid triad var name.
- triad.utils.schema.safe_replace_out_of_quote(s, find, replace, quote='`')[source]
Replace strings out of the quoted part
- triad.utils.schema.safe_search_out_of_quote(s, chars, quote='`')[source]
Search for chars out of the quoted parts
- Parameters:
- Yield:
the tuple in format of
position, char
- Return type:
- triad.utils.schema.safe_split_and_unquote(s, sep_char=',', quote='`', on_unquoted_empty='keep')[source]
Split the string and unquote every part
Examples
" a , ` b ` , c "
=>["a", " b ","c"]
- Parameters:
- Raises:
ValueError – if there are empty but unquoted parts and
on_unquoted_empty
isthrow
- Return type:
- Returns:
the unquoted parts.
- triad.utils.schema.safe_split_out_of_quote(s, sep_chars, max_split=-1, quote='`')[source]
- Return type:
triad.utils.string
- triad.utils.string.assert_triad_var_name(expr)[source]
Check if
expr
is a valid Triad variable name based on Triad standard: it has to be a valid python identifier and it can’t be purely_
- Parameters:
expr (
str
) – column name expression- Raises:
AssertionError – if the expression is invalid
- Return type:
- Returns:
the expression string
triad.utils.threading
- class triad.utils.threading.RunOnce(func, key_func=None, lock_type=<function RLock>)[source]
Bases:
object
Run func once, the uniqueness is defined by key_func. This implementation is serialization safe and thread safe.
Note
Please use the decorator
run_once()
instead of directly using this class- Parameters:
func (
typing.Callable
) – the function to run only once with this wrapper instancekey_func (
typing.Optional
[typing.Callable
]) – the unique key determined by arguments of func, if not set, it will use the same hasing logic asfunctools.lru_cache()
lock_type (
typing.Type
) – lock class type for thread safe
- class triad.utils.threading.SerializableRLock[source]
Bases:
object
A serialization safe wrapper of
threading.RLock
- triad.utils.threading.run_once(func=None, key_func=None, lock_type=<function RLock>)[source]
The decorator to run func once, the uniqueness is defined by key_func. This implementation is serialization safe and thread safe.
- Parameters:
func (
typing.Optional
[typing.Callable
]) – the function to run only once with this wrapper instancekey_func (
typing.Optional
[typing.Callable
]) – the unique key determined by arguments of func, if not set, it will use the same hasing logic asfunctools.lru_cache()
lock_type (
typing.Type
) – lock class type for thread safe, it doesn’t need to be serialization safe
- Return type:
Examples
@run_once def r(a): return max(a) a1 = [0, 1] a2 = [0, 2] assert 1 == r(a1) # will trigger r assert 1 == r(a1) # will get the result from cache assert 2 == r(a2) # will trigger r again because of different arguments # the following example ignores arguments @run_once(key_func=lambda *args, **kwargs: True) def r2(a): return max(a) assert 1 == r(a1) # will trigger r assert 1 == r(a2) # will get the result from cache
Note
Hash collision is the concern of the user, not this class, your key_func should avoid any potential collision
func can have no return
For concurrent calls of this wrapper, only one will trigger func other calls will be blocked until the first call returns an result
This class is cloudpicklable, but unpickled instance does NOT share the same context with the original one
This is not to replace
functools.lru_cache()
, it is not supposed to cache a lot of items