triad.utils

triad.utils.assertion

triad.utils.assertion.assert_arg_not_none(obj, arg_name='', msg='')[source]

Assert an argument is not None, otherwise raise exception

Parameters:
  • obj (typing.Any) – argument value

  • arg_name (str) – argument name, if None or empty, it will use msg

  • msg (str) – only when arg_name is None or empty, this value is used

Raises:

NoneArgumentError – with arg_name or msg

Return type:

None

triad.utils.assertion.assert_or_throw(bool_exp, exception=None)[source]

Assert on expression and throw custom exception

Parameters:
  • bool_exp (bool) – boolean expression to assert on

  • exception (typing.Optional[typing.Any]) – a custom Exception instance, or any other object that will be stringfied and instantiate an AssertionError, or a function that can generate the supported data types

Return type:

None

Examples

assert_or_throw(True, "assertion error")
assert_or_throw(False)  # raise AssertionError
assert_or_throw(False, "assertion error")  # raise AssertionError
assert_or_throw(False, TypeError("assertion error"))  # raise TypeError

# Lazy evaluations is useful when constructing the error
# itself is expensive or error-prone. With lazy evaluations, happy
# path will be fast and error free.
def fail():  # a function that is slow and wrong
    sleep(10)
    raise TypeError

assert_or_throw(True, fail())  # (unexpectedly) raise TypeError
assert_or_throw(True, fail)  # no exception
assert_or_throw(True, lambda: "a" + fail())  # no exception
assert_or_throw(False, lambda: "a" + fail())  # raise TypeError

triad.utils.batch_reslicers

class triad.utils.batch_reslicers.ArrowTableBatchReslicer(row_limit=None, size_limit=None)[source]

Bases: BatchReslicer[Table]

concat(batches)[source]

Concatenate a list of batches into one batch

Parameters:

batches (typing.List[pyarrow.lib.Table]) – the list of batches

Return type:

pyarrow.lib.Table

Returns:

the concatenated batch

get_rows_and_size(batch)[source]

Get the number of rows and byte size of a batch

Parameters:

batch (pyarrow.lib.Table) – the batch object

Return type:

typing.Tuple[int, int]

Returns:

the number of rows and byte size of the batch

take(batch, start, length)[source]

Take a slice of the batch

Parameters:
  • batch (pyarrow.lib.Table) – the batch object

  • start (int) – the start row index

  • length (int) – the number of rows to take

Return type:

pyarrow.lib.Table

Returns:

a slice of the batch

class triad.utils.batch_reslicers.ArrowTableSortedBatchReslicer(keys)[source]

Bases: SortedBatchReslicer[Table]

concat(batches)[source]

Concatenate a list of batches into one batch

Parameters:

batches (typing.List[pyarrow.lib.Table]) – the list of batches

Return type:

pyarrow.lib.Table

Returns:

the concatenated batch

get_batch_length(batch)[source]

Get the number of rows in the batch

Parameters:

batch (pyarrow.lib.Table) – the batch object

Return type:

int

Returns:

the number of rows in the batch

get_keys_ndarray(batch, keys)[source]

Get the keys as a numpy array

Parameters:
Return type:

numpy.ndarray

Returns:

the keys as a numpy array

take(batch, start, length)[source]

Take a slice of the batch

Parameters:
  • batch (pyarrow.lib.Table) – the batch object

  • start (int) – the start row index

  • length (int) – the number of rows to take

Return type:

pyarrow.lib.Table

Returns:

a slice of the batch

class triad.utils.batch_reslicers.BatchReslicer(row_limit=None, size_limit=None)[source]

Bases: Generic[T]

Reslice batch streams with row or/and size limit

Parameters:
Raises:

AssertionError – if size_limit is not None but sizer is None

concat(batches)[source]

Concatenate a list of batches into one batch

Parameters:

batches (typing.List[typing.TypeVar(T)]) – the list of batches

Return type:

typing.TypeVar(T)

Returns:

the concatenated batch

get_rows_and_size(batch)[source]

Get the number of rows and byte size of a batch

Parameters:

batch (typing.TypeVar(T)) – the batch object

Return type:

typing.Tuple[int, int]

Returns:

the number of rows and byte size of the batch

reslice(batches)[source]

Reslice the batch stream into new batches constrained by the row or size limit

Parameters:

batches (typing.Iterable[typing.TypeVar(T)]) – the batch stream

Yield:

an iterable of batches of the same type with the constraints

Return type:

typing.Iterable[typing.TypeVar(T)]

take(batch, start, length)[source]

Take a slice of the batch

Parameters:
  • batch (typing.TypeVar(T)) – the batch object

  • start (int) – the start row index

  • length (int) – the number of rows to take

Return type:

typing.TypeVar(T)

Returns:

a slice of the batch

class triad.utils.batch_reslicers.NumpyArrayBatchReslicer(row_limit=None, size_limit=None)[source]

Bases: BatchReslicer[ndarray]

concat(batches)[source]

Concatenate a list of batches into one batch

Parameters:

batches (typing.List[numpy.ndarray]) – the list of batches

Return type:

numpy.ndarray

Returns:

the concatenated batch

get_rows_and_size(batch)[source]

Get the number of rows and byte size of a batch

Parameters:

batch (numpy.ndarray) – the batch object

Return type:

typing.Tuple[int, int]

Returns:

the number of rows and byte size of the batch

take(batch, start, length)[source]

Take a slice of the batch

Parameters:
  • batch (numpy.ndarray) – the batch object

  • start (int) – the start row index

  • length (int) – the number of rows to take

Return type:

numpy.ndarray

Returns:

a slice of the batch

class triad.utils.batch_reslicers.PandasBatchReslicer(row_limit=None, size_limit=None)[source]

Bases: BatchReslicer[DataFrame]

concat(batches)[source]

Concatenate a list of batches into one batch

Parameters:

batches (typing.List[pandas.core.frame.DataFrame]) – the list of batches

Return type:

pandas.core.frame.DataFrame

Returns:

the concatenated batch

get_rows_and_size(batch)[source]

Get the number of rows and byte size of a batch

Parameters:

batch (pandas.core.frame.DataFrame) – the batch object

Return type:

typing.Tuple[int, int]

Returns:

the number of rows and byte size of the batch

take(batch, start, length)[source]

Take a slice of the batch

Parameters:
  • batch (pandas.core.frame.DataFrame) – the batch object

  • start (int) – the start row index

  • length (int) – the number of rows to take

Return type:

pandas.core.frame.DataFrame

Returns:

a slice of the batch

class triad.utils.batch_reslicers.PandasSortedBatchReslicer(keys)[source]

Bases: SortedBatchReslicer[DataFrame]

concat(batches)[source]

Concatenate a list of batches into one batch

Parameters:

batches (typing.List[pandas.core.frame.DataFrame]) – the list of batches

Return type:

pandas.core.frame.DataFrame

Returns:

the concatenated batch

get_batch_length(batch)[source]

Get the number of rows in the batch

Parameters:

batch (pandas.core.frame.DataFrame) – the batch object

Return type:

int

Returns:

the number of rows in the batch

get_keys_ndarray(batch, keys)[source]

Get the keys as a numpy array

Parameters:
  • batch (pandas.core.frame.DataFrame) – the batch object

  • keys (typing.List[str]) – the keys to get

Return type:

numpy.ndarray

Returns:

the keys as a numpy array

take(batch, start, length)[source]

Take a slice of the batch

Parameters:
  • batch (pandas.core.frame.DataFrame) – the batch object

  • start (int) – the start row index

  • length (int) – the number of rows to take

Return type:

pandas.core.frame.DataFrame

Returns:

a slice of the batch

class triad.utils.batch_reslicers.SortedBatchReslicer(keys)[source]

Bases: Generic[T]

Reslice batch streams (that are alredy sorted by keys) by keys.

Parameters:

keys (typing.List[str]) – group keys to reslice by

concat(batches)[source]

Concatenate a list of batches into one batch

Parameters:

batches (typing.List[typing.TypeVar(T)]) – the list of batches

Return type:

typing.TypeVar(T)

Returns:

the concatenated batch

get_batch_length(batch)[source]

Get the number of rows in the batch

Parameters:

batch (typing.TypeVar(T)) – the batch object

Return type:

int

Returns:

the number of rows in the batch

get_keys_ndarray(batch, keys)[source]

Get the keys as a numpy array

Parameters:
Return type:

numpy.ndarray

Returns:

the keys as a numpy array

reslice(batches)[source]

Reslice the batch stream into a stream of iterable of batches of the same keys

Parameters:

batches (typing.Iterable[typing.TypeVar(T)]) – the batch stream

Yield:

an iterable of iterable of batches containing same keys

Return type:

typing.Iterable[typing.Iterable[typing.TypeVar(T)]]

reslice_and_merge(batches)[source]

Reslice the batch stream into new batches, each containing the same keys

Parameters:

batches (typing.Iterable[typing.TypeVar(T)]) – the batch stream

Yield:

an iterable of batches, each containing the same keys

Return type:

typing.Iterable[typing.TypeVar(T)]

take(batch, start, length)[source]

Take a slice of the batch

Parameters:
  • batch (typing.TypeVar(T)) – the batch object

  • start (int) – the start row index

  • length (int) – the number of rows to take

Return type:

typing.TypeVar(T)

Returns:

a slice of the batch

triad.utils.class_extension

triad.utils.class_extension.extensible_class(class_type)[source]

The decorator making classes extensible by external methods

Parameters:

class_type (typing.Type) – the class under the decorator

Return type:

typing.Type

Returns:

the class_type

Examples

@extensible_class
class A:

    # It's recommended to implement __getattr__ so that
    # PyLint will not complain about the dynamically added methods
    def __getattr__(self, name):
        raise NotImplementedError

@extension_method
def method(obj:A):
    return 1

assert 1 == A().method()

Note

If the method name is already in the original class, a ValueError will be thrown. You can’t modify any built-in attribute.

triad.utils.class_extension.extension_method(func=None, class_type=None, name=None, on_dup='error')[source]

The decorator to add functions as members of the correspondent classes.

Parameters:
  • func (typing.Optional[typing.Callable]) – the function under the decorator

  • class_type (typing.Optional[typing.Type]) – the parent class type, defaults to None

  • name (typing.Optional[str]) – the specified class method name, defaults to None. If None then func.__name__ will be used as the method name

  • on_dup (str) – action on name duplication, defaults to error. error will throw a ValueError; ignore will take no action; overwrite will use the current method to overwrite.

Return type:

typing.Callable

Returns:

the underlying function

Examples

@extensible_class
class A:

    # It's recommended to implement __getattr__ so that
    # PyLint will not complain about the dynamically added methods
    def __getattr__(self, name):
        raise NotImplementedError

# The simplest way to use this decorator, the first argument of
# the method must be annotated, and the annotated type is the
# class type to add this method to.
@extension_method
def method1(obj:A):
    return 1

assert 1 == A().method1()

# Or you can be explicit of the class type and the name of the
# method in the class. In this case, you don't have to annotate
# the first argument.
@extension_method(class_type=A, name="m3")
def method2(obj, b):
    return 2 + b

assert 5 == A().m3(3)

Note

If the method name is already in the original class, a ValueError will be thrown. You can’t modify any built-in attribute.

triad.utils.convert

triad.utils.convert.as_type(obj, target)[source]

Convert obj into target type

Parameters:
Return type:

typing.Any

Returns:

object in the target type

triad.utils.convert.get_caller_global_local_vars(global_vars=None, local_vars=None, start=-1, end=-1)[source]

Get the caller level global and local variables.

Parameters:
  • global_vars (typing.Optional[typing.Dict[str, typing.Any]]) – overriding global variables, if not None, will return this instead of the caller’s globals(), defaults to None

  • local_vars (typing.Optional[typing.Dict[str, typing.Any]]) – overriding local variables, if not None, will return this instead of the caller’s locals(), defaults to None

  • start (int) – start stack level (from 0 to any negative number), defaults to -1 which is one level above where this function is invoked

  • end (int) – end stack level (from start to any smaller negative number), defaults to -1 which is one level above where this function is invoked

Return type:

typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]

Returns:

tuple of global_vars and local_vars

Examples

def caller():
    x=1
    assert 1 == get_value("x")

def get_value(var_name):
    _, l = get_caller_global_local_vars()
    assert var_name in l
    assert var_name not in locals()
    return l[var_name]
Notice:

This is for internal use, users normally should not call this directly.

If merging multiple levels, the variables on closer level (to where it is invoked) will overwrite the further levels values if there is overlap.

Examples

def f1():
    x=1

    def f2():
        x=2

        def f3():
            _, l = get_caller_global_local_vars(start=-1,end=-2)
            assert 2 == l["x"]

            _, l = get_caller_global_local_vars(start=-2,end=-2)
            assert 1 == l["x"]

    f2()
f1()
triad.utils.convert.get_full_type_path(obj)[source]

Get the full module path of the type (if obj is class or function) or type of the instance (if obj is an object instance)

Parameters:

obj (typing.Any) – a class/function type or an object instance

Raises:

TypeError – if obj is None, lambda, or neither a class or a function

Return type:

str

Returns:

full path string

triad.utils.convert.str_to_instance(s, expected_base_type=None, args=[], kwargs={}, global_vars=None, local_vars=None)[source]

Use str_to_type() to find a matching type and instantiate

Parameters:
Return type:

typing.Any

Returns:

the instantiated the object

triad.utils.convert.str_to_object(expr, global_vars=None, local_vars=None)[source]

Convert string expression to object. The string expression must express a type with relative or full path, or express a local or global instance without brackets or operators.

Parameters:
Return type:

typing.Any

Returns:

the object

Raises:

ValueError – unable to find a matching object

Examples

class _Mock:
    def __init__(self, x=1):
        self.x = x

m = _Mock()
assert 1 == str_to_object("m.x")
assert 1 == str_to_object("m2.x", local_vars={"m2": m})
assert RuntimeError == str_to_object("RuntimeError")
assert _Mock == str_to_object("_Mock")

Note

This function is to dynamically load an object from string expression. If you write that string expression as python code at the same location, it should generate the same result.

triad.utils.convert.str_to_type(s, expected_base_type=None, global_vars=None, local_vars=None)[source]

Given a string expression, find the first/last type from all import libraries. If the expression contains ., it’s supposed to be a relative or full path of the type including modules.

Parameters:
Raises:

TypeError – unable to find a matching type

Return type:

type

Returns:

found type

triad.utils.convert.to_bool(obj)[source]

Convert an object to python bool value. It can handle values like True, true, yes, 1, etc

Parameters:

obj (typing.Any) – object

Raises:

TypeError – if failed to convert

Return type:

bool

Returns:

bool value

triad.utils.convert.to_datetime(obj)[source]

Convert an object to python datetime. If the object is a string, then if ciso8601 is installed then it will use ciso8601.parse_datetime to parse else it will use pandas.to_datetime to parse, which can be a lot slower.

Parameters:

obj (typing.Any) – object

Raises:

TypeError – if failed to convert

Return type:

datetime.datetime

Returns:

datetime value

triad.utils.convert.to_function(func, global_vars=None, local_vars=None)[source]

For an expression, it tries to find the matching function.

Params s:

a string expression or a callable

Parameters:
Raises:

AttributeError – if unable to find such a function

Return type:

typing.Any

Returns:

the matching function

triad.utils.convert.to_instance(s, expected_base_type=None, args=[], kwargs={}, global_vars=None, local_vars=None)[source]

If s is str or type, then use to_type() to find matching type and instantiate. Otherwise return s if it matches constraints

Parameters:
Raises:
  • ValueError – if s is an instance but not a (sub)type of expected_base_type

  • TypeError – if s is an instance, args and kwargs must be empty

Return type:

typing.Any

Returns:

the instantiated object

triad.utils.convert.to_size(exp)[source]

Convert input value or expression to size For expression string, it must be in the format of <value> or <value><unit>. Value must be 0 or positive, default unit is byte if not provided. Unit can be b, byte, k, kb, m, mb, g, gb, t, tb.

Return type:

int

Args:

exp (Any): expression string or numerical value

Raises:

ValueError: for invalid expression ValueError: for negative values

Returns:

int: size in byte

triad.utils.convert.to_timedelta(obj)[source]

Convert an object to python datetime.

If the object is a string, min or -inf will return timedelta.min, max or inf will return timedelta.max; if the object is a number, the number will be used as the seconds argument; Otherwise it will use pandas.to_timedelta to parse the object.

Parameters:

obj (typing.Any) – object

Raises:

TypeError – if failed to convert

Return type:

datetime.timedelta

Returns:

timedelta value

triad.utils.convert.to_type(s, expected_base_type=None, global_vars=None, local_vars=None)[source]

Convert an object s to type * if s is str: see str_to_type() * if s is type: check expected_base_type and return itself * else: check expected_base_type and return itself

Parameters:
Raises:

TypeError – if no matching type found

Return type:

type

Returns:

the matching type

triad.utils.dispatcher

class triad.utils.dispatcher.ConditionalDispatcher(default_func, is_broadcast, entry_point=None)[source]

Bases: object

A conditional function dispatcher based on custom matching functions. This is a more general solution compared to functools.singledispatch. You can write arbitrary matching functions according to all the inputs of the function.

Note

Please use the decorators conditional_dispatcher() and conditional_broadcaster() instead of directly using this class.

Parameters:
  • default_func (typing.Callable[..., typing.Any]) – the parent function that will dispatch the execution based on matching functions

  • entry_point (typing.Optional[str]) – the entry point to preload children functions, defaults to None

candidate(matcher, priority=1.0)[source]

A decorator to register a child function with matcher and priority.

Note

The order to be matched is determined by both the priority and the order of registration.

  • The default priority is 1.0

  • Children with higher priority values will be matched earlier

  • When priority>0 then later registrations will be matched earlier

  • When priority<=0 then earlier registrations will be matched earlier

So if you want to ‘overwrite’ the existed matches, set priority to be greater than 1.0. If you want to ‘ignore’ the current if there are other matches, set priority to 0.0.

See also

Please see examples in conditional_dispatcher() and conditional_broadcaster().

Parameters:
  • matcher (typing.Callable[..., bool]) – a function determines whether it is a match based on the same input as the parent function

  • priority (float) – it determines the order to be matched, higher value means higher priority, defaults to 1.0

Return type:

typing.Callable

register(func, matcher, priority=1.0)[source]

Register a child function with matcher and priority.

Note

The order to be matched is determined by both the priority and the order of registration.

  • The default priority is 1.0

  • Children with higher priority values will be matched earlier

  • When priority>0 then later registrations will be matched earlier

  • When priority<=0 then earlier registrations will be matched earlier

So if you want to ‘overwrite’ the existed matches, set priority to be greater than 1.0. If you want to ‘ignore’ the current if there are other matches, set priority to 0.0.

Parameters:
  • func (typing.Callable[..., typing.Any]) – a child function to be used when matching

  • matcher (typing.Callable[..., bool]) – a function determines whether it is a match based on the same input as the parent function

  • priority (float) – it determines the order to be matched, higher value means higher priority, defaults to 1.0

Return type:

None

run(*args, **kwargs)[source]

Execute all matching children functions as a generator. :rtype: typing.Iterable[typing.Any]

Note

Only when there is matching functions, the default implementation will be invoked.

run_top(*args, **kwargs)[source]

Execute the first matching child function

Return type:

typing.Any

Returns:

the return of the child function

triad.utils.dispatcher.conditional_broadcaster(default_func=None, entry_point=None)[source]

Decorating a conditional broadcaster that will run all registered functions in other modules/packages.

Examples

Assume in pkg1.module1, you have:

from triad import conditional_broadcaster

@conditional_broadcaster(entry_point="my.plugins")
def myprint(obj):
    raise NotImplementedError

@conditional_broadcaster(entry_point="my.plugins")
def myprint2(obj):
    raise NotImplementedError

In another package pkg2, in setup.py, you define an entry point as:

setup(
    ...,
    entry_points={
        "my.plugins": [
            "my = pkg2.module2"
        ]
    },
)

And in pkg2.module2:

from pkg1.module1 import get_len

@myprint.candidate(lambda obj: isinstance(obj, str))
def myprinta(obj:str) -> None:
    print(obj, "a")

@myprint.candidate(lambda obj: isinstance(obj, str) and obj == "x")
def myprintb(obj:str) -> None:
    print(obj, "b")

Now, both functions will be automatically registered when pkg2 is installed in the environement. In another pkg3:

from pkg1.module1 import get_len

myprint("x")  # calling both myprinta and myprinta
myprint("y")  # calling myprinta only
myprint2("x")  # raise NotImplementedError due to no matching candidates

Note

Only when no matching candidate found, the implementation of the original function will be used. If you don’t want to throw an error, then use pass in the original function instead.

See also

Please read candidate() for details about the matching function and priority settings.

Parameters:
Return type:

typing.Callable

triad.utils.dispatcher.conditional_dispatcher(default_func=None, entry_point=None)[source]

Decorating a conditional dispatcher that will run the first matching registered functions in other modules/packages. This is a more general solution compared to functools.singledispatch. You can write arbitrary matching functions according to all the inputs of the function.

Examples

Assume in pkg1.module1, you have:

from triad import conditional_dispatcher

@conditional_dispatcher(entry_point="my.plugins")
def get_len(obj):
    raise NotImplementedError

In another package pkg2, in setup.py, you define an entry point as:

setup(
    ...,
    entry_points={
        "my.plugins": [
            "my = pkg2.module2"
        ]
    },
)

And in pkg2.module2:

from pkg1.module1 import get_len

@get_len.candidate(lambda obj: isinstance(obj, str))
def get_str_len(obj:str) -> int:
    return len(obj)

@get_len.candidate(lambda obj: isinstance(obj, int) and obj == 10)
def get_int_len(obj:int) -> int:
    return obj

Now, both functions will be automatically registered when pkg2 is installed in the environement. In another pkg3:

from pkg1.module1 import get_len

assert get_len("abc") == 3  # calling get_str_len
assert get_len(10) == 10  # calling get_int_len
get_len(20)  # raise NotImplementedError due to no matching candidates

See also

Please read candidate() for details about the matching function and priority settings.

Parameters:
Return type:

typing.Callable

triad.utils.dispatcher.run_at_def(run_at_def_func=None, **kwargs)[source]

Decorator to run the function at declaration. This is useful when we want import to trigger a function run (which can guarantee it runs only once).

Examples

Assume the following python file is a module in your package, then when you import package.module, the two functions will run.

from triad import run_at_def

@run_at_def
def register_something():
    print("registered")

@run_at_def(a=1)
def register_something2(a):
    print("registered", a)
Parameters:
Return type:

typing.Callable

triad.utils.entry_points

triad.utils.hash

triad.utils.hash.to_uuid(*args)[source]

Determine the uuid by input arguments. It will search the input recursively. If an object contains __uuid__ method, it will call that method to get the uuid for that object.

Examples

to_uuid([1,2,3])
to_uuid(1,2,3)
to_uuid(dict(a=1,b="z"))
Parameters:

args (typing.Any) – arbitrary input

Return type:

str

Returns:

uuid string

triad.utils.io

triad.utils.io.abs_path(path)[source]

Get the absolute path of a path

Parameters:

path (str) – the path to check

Return type:

str

Returns:

the absolute path

triad.utils.io.chdir(path)[source]

Change the current working directory to the given path

Parameters:

path (str) – the path to change to

Return type:

typing.Iterator[None]

Examples

from fugue_ml.utils.io import chdir

with chdir("/tmp"):
    # do something
triad.utils.io.exists(path)[source]

Check if a file or a directory exists

Parameters:

path (str) – the path to check

Return type:

bool

Returns:

whether the path (resource) exists

triad.utils.io.glob(path)[source]

Glob files

Parameters:

path (str) – the path to glob

Return type:

typing.List[str]

Returns:

the matched files (absolute paths)

triad.utils.io.isdir(path)[source]

Check if a path is a directory

Parameters:

path (str) – the path to check

Return type:

bool

Returns:

whether the path is a directory

triad.utils.io.isfile(path)[source]

Check if a path is a file

Parameters:

path (str) – the path to check

Return type:

bool

Returns:

whether the path is a file

triad.utils.io.join(base_path, *paths)[source]

Join paths with the base path

Parameters:
  • base_path (str) – the base path

  • paths (str) – the paths to join to the base path

Return type:

str

Returns:

the joined path

triad.utils.io.makedirs(path, exist_ok=False)[source]

Create a directory

Parameters:
  • path (str) – the directory path

  • exist_ok (bool) – if True, do not raise error if the directory exists, defaults to False

Return type:

str

Returns:

the absolute directory path

triad.utils.io.read_bytes(path)[source]

Read bytes from a file

Parameters:

path (str) – the file path

Return type:

bytes

Returns:

the bytes

triad.utils.io.read_text(path)[source]

Read text from a file

Parameters:

path (str) – the file path

Return type:

str

Returns:

the text

triad.utils.io.rm(path, recursive=False)[source]

Remove a file or a directory

Parameters:
  • path (str) – the path to remove

  • recursive (bool) – if True and the path is directory, remove the directory recursively, defaults to False

Return type:

None

triad.utils.io.touch(path, auto_mkdir=False)[source]

Create an empty file or update the timestamp of the file

Parameters:
  • path (str) – the file path

  • makedirs – if True, create the directory if not exists, defaults to False

Return type:

None

triad.utils.io.unzip_to_temp(fobj)[source]

Unzip a file object into a temporary directory.

Parameters:

fobj (typing.Any) – the file object

Return type:

typing.Iterator[str]

Examples

from fugue_ml.utils.io import zip_temp
from io import BytesIO

bio = BytesIO()
with zip_temp(bio) as tmpdir:
    # create files in the tmpdir (string)

with unzip_to_temp(BytesIO(bio.getvalue())) as tmpdir:
    # read files from the tmpdir (string)
triad.utils.io.url_to_fs(path, **kwargs)[source]

A wrapper of fsspec.core.url_to_fs

Parameters:
  • path (str) – the path to be used

  • kwargs (typing.Any) – additional arguments to fsspec.core.url_to_fs

Return type:

typing.Tuple[fsspec.spec.AbstractFileSystem, str]

Returns:

the file system and the path

triad.utils.io.write_bytes(path, contents, create_dir=True)[source]

Write bytes to a file. If the directory of the file does not exist, it will create the directory first

Parameters:
  • path (str) – the file path

  • contents (bytes) – the bytes to write

  • create_dir (bool) – if True, create the directory if not exists, defaults to True

Return type:

None

triad.utils.io.write_text(path, contents)[source]

Write text to a file. If the directory of the file does not exist, it will create the directory first

Parameters:
  • path (str) – the file path

  • contents (str) – the text to write

Return type:

None

triad.utils.io.zip_temp(fobj)[source]

Zip a temporary directory to a file object.

Parameters:

fobj (typing.Any) – the file path or file object

Return type:

typing.Iterator[str]

Examples

from fugue_ml.utils.io import zip_temp
from io import BytesIO

bio = BytesIO()
with zip_temp(bio) as tmpdir:
    # do something with tmpdir (string)

triad.utils.iter

class triad.utils.iter.EmptyAwareIterable(it)[source]

Bases: Iterable[T]

A wrapper of iterable that can tell if the underlying iterable is empty, it can also peek a non-empty iterable.

Parameters:

it (typing.Union[typing.Iterable[typing.TypeVar(T)], typing.Iterator[typing.TypeVar(T)]]) – the underlying iterable

Raises:

StopIteration – raised by the underlying iterable

property empty: bool

Check if the underlying iterable has more items

Returns:

whether it is empty

peek()[source]

Return the next of the iterable without moving

Raises:

StopIteration – if it’s empty

Return type:

typing.TypeVar(T)

Returns:

the next item

class triad.utils.iter.Slicer(sizer=None, row_limit=None, size_limit=None, slicer=None)[source]

Bases: object

A better version of slice_iterable()

Parameters:
Raises:

AssertionError – if size_limit is not None but sizer is None

slice(orig_it)[source]

Slice the original iterable into slices by the combined slicing logic

Parameters:

orig_it (typing.Iterable[typing.TypeVar(T)]) – ther original iterable

Yield:

an iterable of EmptyAwareIterable

Return type:

typing.Iterable[triad.utils.iter.EmptyAwareIterable[typing.TypeVar(T)]]

triad.utils.iter.make_empty_aware(it)[source]

Make an iterable empty aware, or return itself if already empty aware

Parameters:

it (typing.Union[typing.Iterable[typing.TypeVar(T)], typing.Iterator[typing.TypeVar(T)]]) – underlying iterable

Return type:

triad.utils.iter.EmptyAwareIterable[typing.TypeVar(T)]

Returns:

EmptyAwareIterable[T]

triad.utils.iter.slice_iterable(it, slicer)[source]

Slice the original iterable into slices by slicer

Parameters:
Yield:

an iterable of iterables (_SliceIterable[T])

Return type:

typing.Iterable[triad.utils.iter._SliceIterable[typing.TypeVar(T)]]

triad.utils.iter.to_kv_iterable(data, none_as_empty=True)[source]

Convert data to iterable of key value pairs

Parameters:
  • data (typing.Any) – input object, it can be a dict or Iterable[Tuple[Any, Any]] or Iterable[List[Any]]

  • none_as_empty (bool) – if to treat None as empty iterable

Raises:
Yield:

iterable of key value pair as tuples

Return type:

typing.Iterable[typing.Tuple[typing.Any, typing.Any]]

triad.utils.json

triad.utils.json.check_for_duplicate_keys(ordered_pairs)[source]

Raise ValueError if a duplicate key exists in provided ordered list of pairs, otherwise return a dict.

Example: :rtype: typing.Dict[typing.Any, typing.Any]

>>> json.loads('{"x": 1, "x": 2}', object_pairs_hook=check_for_duplicate_keys)
Raises:

KeyError – if there is duplicated key

triad.utils.json.loads_no_dup(json_str)[source]

Load json string, and raise KeyError if there are duplicated keys

Parameters:

json_str (str) – json string

Raises:

KeyError – if there are duplicated keys

Return type:

typing.Any

Returns:

the parsed object

triad.utils.pandas_like

class triad.utils.pandas_like.PandasLikeUtils[source]

Bases: Generic[T, ColT]

A collection of utils for general pandas like dataframes

as_array_iterable(df, schema=None, columns=None, type_safe=False)[source]

Convert pandas like dataframe to iterable of rows in the format of list.

Parameters:
  • df (typing.TypeVar(T, bound= typing.Any)) – pandas like dataframe

  • schema (typing.Optional[pyarrow.lib.Schema]) – schema of the input. With None, it will infer the schema, it can’t infer wrong schema for nested types, so try to be explicit

  • columns (typing.Optional[typing.List[str]]) – columns to output, None for all columns

  • type_safe (bool) – whether to enforce the types in schema, if False, it will return the original values from the dataframe

Return type:

typing.Iterable[typing.List[typing.Any]]

Returns:

iterable of rows, each row is a list

as_arrow(df, schema=None)[source]

Convert pandas like dataframe to pyarrow table

Parameters:
Return type:

pyarrow.lib.Table

Returns:

pyarrow table

cast_df(df, schema, use_extension_types=True, use_arrow_dtype=False, **kwargs)[source]

Cast pandas like dataframe to comply with schema.

Parameters:
  • df (typing.TypeVar(T, bound= typing.Any)) – pandas like dataframe

  • schema (pyarrow.lib.Schema) – pyarrow schema to cast to

  • use_extension_types (bool) – whether to use ExtensionDType, default True

  • use_arrow_dtype (bool) – whether to use ArrowDtype, default False

  • kwargs (typing.Any) – other arguments passed to pa.Table.from_pandas

Return type:

typing.TypeVar(T, bound= typing.Any)

Returns:

converted dataframe

concat_dfs(*dfs)[source]

Concatenate dataframes

Parameters:

dfs (typing.TypeVar(T, bound= typing.Any)) – the dataframes to concatenate

Return type:

typing.TypeVar(T, bound= typing.Any)

Returns:

the concatenated dataframe

drop_duplicates(df)[source]

Remove duplicated rows

Parameters:

df (typing.TypeVar(T, bound= typing.Any)) – the dataframe

Return type:

typing.TypeVar(T, bound= typing.Any)

Returns:

the dataframe without duplicated rows

empty(df)[source]

Check if the dataframe is empty

Parameters:

df (typing.TypeVar(T, bound= typing.Any)) – pandas like dataframe

Return type:

bool

Returns:

if it is empty

ensure_compatible(df)[source]

Check whether the datafame is compatible with the operations inside this utils collection, if not, it will raise ValueError

Parameters:

df (typing.TypeVar(T, bound= typing.Any)) – pandas like dataframe

Raises:

ValueError – if not compatible

Return type:

None

except_df(df1, df2, unique, anti_indicator_col='__anti_indicator__')[source]

Remove df2 from df1

Parameters:
Return type:

typing.TypeVar(T, bound= typing.Any)

Returns:

the dataframe with df2 removed

fillna_default(col)[source]

Fill column with default values according to the dtype of the column.

Parameters:

col (typing.Any) – series of a pandas like dataframe

Return type:

typing.Any

Returns:

filled series

intersect(df1, df2, unique)[source]

Intersect two dataframes

Parameters:
  • ndf1 – dataframe 1

  • ndf2 – dataframe 2

  • unique (bool) – whether to remove duplicated rows

Return type:

typing.TypeVar(T, bound= typing.Any)

Returns:

the intersected dataframe

is_compatile_index(df)[source]

Check whether the datafame is compatible with the operations inside this utils collection

Parameters:

df (typing.TypeVar(T, bound= typing.Any)) – pandas like dataframe

Return type:

bool

Returns:

if it is compatible

join(ndf1, ndf2, join_type, on, anti_indicator_col='__anti_indicator__', cross_indicator_col='__corss_indicator__')[source]

Join two dataframes

Parameters:
Return type:

typing.TypeVar(T, bound= typing.Any)

Returns:

the joined dataframe

parse_join_type(join_type)[source]

Parse join type string to standard join type string

Parameters:

join_type (str) – the join type string

Return type:

str

Returns:

the standard join type string

safe_groupby_apply(df, cols, func, key_col_name='__safe_groupby_key__', **kwargs)[source]

Safe groupby apply operation on pandas like dataframes. In pandas like groupby apply, if any key is null, the whole group is dropped. This method makes sure those groups are included.

Parameters:
Return type:

typing.TypeVar(T, bound= typing.Any)

Returns:

output dataframe

Notice:

The dataframe must be either empty, or with type pd.RangeIndex, pd.Int64Index or pd.UInt64Index and without a name, otherwise, ValueError will raise.

to_parquet_friendly(df, partition_cols=None)[source]

Parquet doesn’t like pd.ArrowDtype(<nested types>), this function converts all nested types to object types

Parameters:
Return type:

typing.TypeVar(T, bound= typing.Any)

Returns:

the converted dataframe

to_schema(df)[source]

Extract pandas dataframe schema as pyarrow schema. This is a replacement of pyarrow.Schema.from_pandas, and it can correctly handle string type and empty dataframes

Parameters:

df (typing.TypeVar(T, bound= typing.Any)) – pandas dataframe

Raises:

ValueError – if pandas dataframe does not have named schema

Return type:

pyarrow.lib.Schema

Returns:

pyarrow.Schema

Notice:

The dataframe must be either empty, or with type pd.RangeIndex, pd.Int64Index or pd.UInt64Index and without a name, otherwise, ValueError will raise.

union(ndf1, ndf2, unique)[source]

Union two dataframes

Parameters:
Return type:

typing.TypeVar(T, bound= typing.Any)

Returns:

the unioned dataframe

class triad.utils.pandas_like.PandasUtils[source]

Bases: PandasLikeUtils[DataFrame, Series]

A collection of pandas utils

concat_dfs(*dfs)[source]

Concatenate dataframes

Parameters:

dfs (pandas.core.frame.DataFrame) – the dataframes to concatenate

Return type:

pandas.core.frame.DataFrame

Returns:

the concatenated dataframe

triad.utils.pyarrow

class triad.utils.pyarrow.SchemaedDataPartitioner(schema, key_positions, sizer=None, row_limit=0, size_limit=None)[source]

Bases: object

Partitioner for stream of array like data with given schema. It uses :func”~triad.utils.iter.Slicer to partition the stream

Parameters:
partition(data)[source]

Partition the given data stream

Parameters:

data (typing.Iterable[typing.Any]) – iterable of array like objects

Yield:

iterable of <partition_no, slice_no, slice iterable> tuple

Return type:

typing.Iterable[typing.Tuple[int, int, triad.utils.iter.EmptyAwareIterable[typing.Any]]]

triad.utils.pyarrow.apply_schema(schema, data, copy=True, deep=False, str_as_json=True)[source]

Use pa.Schema to convert a row(list) to the correspondent types.

Notice this function is to convert from python native type to python native type. It is used to normalize data input, which could be generated by different logics, into the correct data types.

Notice this function assumes each item of data has the same length with schema and will not do any extra validation on that.

Parameters:
  • schema (pyarrow.lib.Schema) – pyarrow schema

  • data (typing.Iterable[typing.List[typing.Any]]) – and iterable of rows, represtented by list or tuple

  • copy (bool) – whether to apply inplace (copy=False), or create new instances

  • deep (bool) – whether to do deep conversion on nested (struct, list) types

  • str_as_json (bool) – where to treat string data as json for nested types

Raises:
Yield:

converted rows

Return type:

typing.Iterable[typing.List[typing.Any]]

triad.utils.pyarrow.cast_pa_array(col, new_type)[source]
Return type:

pyarrow.lib.Array

triad.utils.pyarrow.cast_pa_table(df, schema)[source]

Convert a pyarrow table to another pyarrow table with given schema

Parameters:
Return type:

pyarrow.lib.Table

Returns:

the converted pyarrow table

triad.utils.pyarrow.expression_to_schema(expr)[source]

Convert schema expression to pyarrow.Schema.

Format: col_name:col_type[,col_name:col_type]+

If col_type is a list type, the syntax should be [element_type]

If col_type is a struct type, the syntax should be {col_name:col_type[,col_name:col_type]+}

If col_type is a map type, the syntax should be <key_type,value_type>

Whitespaces will be removed. The format of the expression is json without any double quotes

Examples

expression_to_schema("a:int,b:int")
expression_to_schema("a:[int],b:{x:<int,int>,y:{z:[str],w:byte}}")
Parameters:

expr (str) – schema expression

Raises:

SyntaxError – if there is syntax issue or unknown types

Return type:

pyarrow.lib.Schema

Returns:

pyarrow.Schema

triad.utils.pyarrow.get_alter_func(from_schema, to_schema, safe)[source]

Generate the alteration function based on from_schema and to_schema. This function can be applied to arrow tables with from_schema, the outout will be in to_schema’s order and types

Parameters:
Return type:

typing.Callable[[pyarrow.lib.Table], pyarrow.lib.Table]

Returns:

a function that can be applied to arrow tables with from_schema, the outout will be in to_schema’s order and types

triad.utils.pyarrow.get_eq_func(data_type)[source]

Generate equality function for a give datatype

Parameters:

data_type (pyarrow.lib.DataType) – pyarrow data type supported by Triad

Return type:

typing.Callable[[typing.Any, typing.Any], bool]

Returns:

the function

triad.utils.pyarrow.is_supported(data_type, throw=False)[source]

Whether data_type is currently supported by Triad

Parameters:
  • data_type (pyarrow.lib.DataType) – instance of pa.DataType

  • throw (bool) – whether to raise exception if not supported

Return type:

bool

Returns:

if it is supported

triad.utils.pyarrow.pa_batch_to_dicts(batch)[source]

Convert a pyarrow record batch to list of dict

Parameters:

batch (pyarrow.lib.RecordBatch) – the pyarrow record batch

Return type:

typing.List[typing.Dict[str, typing.Any]]

Returns:

the list of dict

triad.utils.pyarrow.pa_batch_to_pandas(batch, use_extension_types=False, use_arrow_dtype=False, **kwargs)[source]

Convert a pyarrow record batch to pandas dataframe

Parameters:
  • batch (pyarrow.lib.RecordBatch) – the pyarrow record batch

  • use_extension_types (bool) – whether to use pandas extension data types, default to False

  • use_arrow_dtype (bool) – if True and when pandas supports ArrowDType, use pyarrow types, default False

  • kwargs (typing.Any) – other arguments for pa.Table.to_pandas

Return type:

pandas.core.frame.DataFrame

Returns:

the pandas dataframe

triad.utils.pyarrow.pa_datatypes_equal(t1, t2, ignore_list_item_name=True, equal_groups=None)[source]

Check if two pyarrow data types are equal

Parameters:
Return type:

bool

Returns:

if the two data types are equal

Note

In the lastest version of pyarrow, in the default comparison logic, list field names are not compared.

Examples

assert not pa_datatypes_equal(pa.int32(), pa.int64())
assert pa_datatypes_equal(
    pa.int32(),
    pa.int64(),
    equal_groups=[[pa.types.is_integer]],
)
triad.utils.pyarrow.pa_schemas_equal(s1, s2, ignore_list_item_name=True, equal_groups=None)[source]

Check if two pyarrow schemas are equal

Parameters:
Return type:

bool

Returns:

if the two schemas are equal

Note

In the lastest version of pyarrow, in the default comparison logic, list field names are not compared.

Examples

s1 = pa.schema([("a", pa.int32()), ("b", pa.string())])
s2 = pa.schema([("a", pa.int64()), ("b", pa.string())])
assert not pa_schemas_equal(s1, s2)
assert pa_schemas_equal(
    s1,
    s2,
    equal_groups=[[pa.types.is_integer]],
)
triad.utils.pyarrow.pa_table_to_pandas(df, use_extension_types=False, use_arrow_dtype=False, **kwargs)[source]

Convert a pyarrow table to pandas dataframe

Parameters:
  • df (pyarrow.lib.Table) – the pyarrow table

  • use_extension_types (bool) – whether to use pandas extension data types, default to False

  • use_arrow_dtype (bool) – if True and when pandas supports ArrowDType, use pyarrow types, default False

  • kwargs (typing.Any) – other arguments for pa.Table.to_pandas

Return type:

pandas.core.frame.DataFrame

Returns:

the pandas dataframe

triad.utils.pyarrow.parse_json_columns(df, columns)[source]

Parse json string columns in a table and replace them with pyarrow types.

Parameters:
Return type:

pyarrow.lib.Table

Returns:

the new table

triad.utils.pyarrow.replace_type(current_type, is_type, convert_type, recursive=True)[source]

Replace current_type or if it is nested, replace in the nested types

Parameters:
Return type:

pyarrow.lib.DataType

Returns:

the new type

triad.utils.pyarrow.replace_types_in_schema(schema, pairs, recursive=True)[source]

Replace types in a schema

Parameters:
Return type:

pyarrow.lib.Schema

Returns:

the new schema

triad.utils.pyarrow.replace_types_in_table(df, pairs, recursive=True, safe=True)[source]

Replace(cast) types in a table

Parameters:
Return type:

pyarrow.lib.Table

Returns:

the new table

triad.utils.pyarrow.schema_to_expression(schema)[source]

Convert pyarrow.Schema to Triad schema expression see expression_to_schema()

Parameters:

schema (pyarrow.lib.Schema) – pyarrow schema

Raises:

NotImplementedError – if there some type is not supported by Triad

Return type:

pyarrow.lib.Schema

Returns:

schema string expression

triad.utils.pyarrow.schemas_equal(a, b, check_order=True, check_metadata=True, ignore=None)[source]

check if two schemas are equal

Parameters:
Return type:

bool

Returns:

if the two schema equal

triad.utils.pyarrow.to_pa_datatype(obj)[source]

Convert an object to pyarrow DataType

Parameters:

obj (typing.Any) – any object

Raises:

TypeError – if unable to convert

Return type:

pyarrow.lib.DataType

Returns:

an instance of pd.DataType

triad.utils.pyarrow.to_pandas_dtype(schema, use_extension_types=False, use_arrow_dtype=False)[source]

convert as dtype dict for pandas dataframes. Currently, struct type is not supported

Parameters:
  • schema (pyarrow.lib.Schema) – the pyarrow schema

  • use_extension_types (bool) – whether to use pandas extension data types, default to False

  • use_arrow_dtype (bool) – if True and when pandas supports ArrowDType, use pyarrow types, default False

Return type:

typing.Dict[str, numpy.dtype]

Returns:

the pandas data type dictionary

Note

  • If use_extension_types is False and use_arrow_dtype is True,

    it converts all types to ArrowDType

  • If both are true, it converts types to the numpy backend nullable

    dtypes if possible, otherwise, it converts to ArrowDType

triad.utils.pyarrow.to_pandas_types_mapper(pa_type, use_extension_types=False, use_arrow_dtype=False)[source]

The types_mapper for pa.Table.to_pandas

Parameters:
  • pa_type (pyarrow.lib.DataType) – the pyarrow data type

  • use_extension_types (bool) – whether to use pandas extension data types, default to False

  • use_arrow_dtype (bool) – if True and when pandas supports ArrowDType, use pyarrow types, default False

Return type:

typing.Optional[pandas.core.dtypes.base.ExtensionDtype]

Returns:

the pandas ExtensionDtype if available, otherwise None

Note

  • If use_extension_types is False and use_arrow_dtype is True,

    it converts the type to ArrowDType

  • If both are true, it converts the type to the numpy backend nullable

    dtypes if possible, otherwise, it converts to ArrowDType

triad.utils.pyarrow.to_single_pandas_dtype(pa_type, use_extension_types=False, use_arrow_dtype=False)[source]

convert a pyarrow data type to a pandas datatype. Currently, struct type is not supported

Parameters:
  • pa_type (pyarrow.lib.DataType) – the pyarrow data type

  • use_extension_types (bool) – whether to use pandas extension data types, default to False

  • use_arrow_dtype (bool) – if True and when pandas supports ArrowDType, use pyarrow types, default False

Return type:

numpy.dtype

Returns:

the pandas data type

Note

  • If use_extension_types is False and use_arrow_dtype is True,

    it converts the type to ArrowDType

  • If both are true, it converts the type to the numpy backend nullable

    dtypes if possible, otherwise, it converts to ArrowDType

triad.utils.rename

triad.utils.rename.normalize_names(names)[source]

Normalize dataframe column names to follow Fugue column naming rules. It only operates on names that are not valid to Fugue.

It tries to minimize the changes to the original name. Special characters will be converted to _, but if this does not provide a valid and unique column name, more transformation will be done.

Note

This is a temporary solution before Schema can take arbitrary names

Examples

  • [0,1] => {0:"_0", 1:"_1"}

  • ["1a","2b"] => {"1a":"_1a", "2b":"_2b"}

  • ["*a","-a"] => {"*a":"_a", "-a":"_a_1"}

Parameters:

names (typing.List[typing.Any]) – the columns names of a dataframe

Return type:

typing.Dict[typing.Any, str]

Returns:

the rename operations as a dict, key is the original column name, value is the new valid name.

triad.utils.schema

triad.utils.schema.move_to_unquoted(expr, p, quote='`')[source]

When p is on a quote, find the position next to the end of the quoted part

Parameters:
  • expr (str) – the original string

  • p (int) – the current position of expr, and it should be a quote

  • quote – the quote character

Raises:

SyntaxError – if there is an open quote detected

Return type:

int

Returns:

the position next to the end of the quoted part

triad.utils.schema.quote_name(name, quote='`')[source]

Add quote ` for strings that are not a valid triad var name.

Parameters:
  • name (str) – the name string

  • quote (str) – the quote char, defaults to `

Return type:

str

Returns:

the quoted(if necessary) string

triad.utils.schema.safe_replace_out_of_quote(s, find, replace, quote='`')[source]

Replace strings out of the quoted part

Parameters:
  • s (str) – the original string

  • find (str) – the string to find

  • replace (str) – the string used to replace

  • quote – the quote character

Return type:

str

Returns:

the string with the replacements

triad.utils.schema.safe_search_out_of_quote(s, chars, quote='`')[source]

Search for chars out of the quoted parts

Parameters:
  • s (str) – the original string

  • chars (str) – the charaters to find

  • quote – the quote character

Yield:

the tuple in format of position, char

Return type:

typing.Iterable[typing.Tuple[int, str]]

triad.utils.schema.safe_split_and_unquote(s, sep_char=',', quote='`', on_unquoted_empty='keep')[source]

Split the string and unquote every part

Examples

" a , ` b ` , c " => ["a", " b ","c"]

Parameters:
  • s (str) – the original string

  • sep_char (str) – the split character, defaults to “,”

  • quote (str) – the quote character

  • on_unquoted_empty (str) – can be keep, ignore or throw, defaults to “keep”

Raises:

ValueError – if there are empty but unquoted parts and on_unquoted_empty is throw

Return type:

typing.List[str]

Returns:

the unquoted parts.

triad.utils.schema.safe_split_out_of_quote(s, sep_chars, max_split=-1, quote='`')[source]
Return type:

typing.List[str]

triad.utils.schema.split_quoted_string(s, quote='`')[source]

Split s to a sequence of quoted and unquoted parts.

Parameters:
  • s (str) – the original string

  • quote – the quote character

Yield:

the tuple in the format of is_quoted, start, end

Return type:

typing.Iterable[typing.Tuple[bool, int, int]]

triad.utils.schema.unquote_name(name, quote='`')[source]

If the input is quoted, then get the inner string, otherwise do nothing.

Parameters:
  • name (str) – the name string

  • quote (str) – the quote char, defaults to `

Return type:

str

Returns:

the value without `

triad.utils.string

triad.utils.string.assert_triad_var_name(expr)[source]

Check if expr is a valid Triad variable name based on Triad standard: it has to be a valid python identifier and it can’t be purely _

Parameters:

expr (str) – column name expression

Raises:

AssertionError – if the expression is invalid

Return type:

str

Returns:

the expression string

triad.utils.string.validate_triad_var_name(expr)[source]

Check if expr is a valid Triad variable name based on Triad standard: it has to be a valid python identifier and it can’t be purely _

Note

Any valid triad var name can be used as column names without quote ` `

Parameters:

expr (str) – column name expression

Return type:

bool

Returns:

whether it is valid

triad.utils.threading

class triad.utils.threading.RunOnce(func, key_func=None, lock_type=<function RLock>)[source]

Bases: object

Run func once, the uniqueness is defined by key_func. This implementation is serialization safe and thread safe.

Note

Please use the decorator run_once() instead of directly using this class

Parameters:
class triad.utils.threading.SerializableRLock[source]

Bases: object

A serialization safe wrapper of threading.RLock

triad.utils.threading.run_once(func=None, key_func=None, lock_type=<function RLock>)[source]

The decorator to run func once, the uniqueness is defined by key_func. This implementation is serialization safe and thread safe.

Parameters:
Return type:

typing.Callable

Examples

@run_once
def r(a):
    return max(a)

a1 = [0, 1]
a2 = [0, 2]
assert 1 == r(a1) # will trigger r
assert 1 == r(a1) # will get the result from cache
assert 2 == r(a2) # will trigger r again because of different arguments

# the following example ignores arguments
@run_once(key_func=lambda *args, **kwargs: True)
def r2(a):
    return max(a)

assert 1 == r(a1) # will trigger r
assert 1 == r(a2) # will get the result from cache

Note

  • Hash collision is the concern of the user, not this class, your key_func should avoid any potential collision

  • func can have no return

  • For concurrent calls of this wrapper, only one will trigger func other calls will be blocked until the first call returns an result

  • This class is cloudpicklable, but unpickled instance does NOT share the same context with the original one

  • This is not to replace functools.lru_cache(), it is not supposed to cache a lot of items