triad.collections#

triad.collections.dict#

class triad.collections.dict.IndexedOrderedDict(*args, **kwds)[source]#

Bases: collections.OrderedDict, Dict[triad.collections.dict.KT, triad.collections.dict.VT]

Subclass of OrderedDict that can get and set with index

clear()[source]#
Return type

None

copy()[source]#
Return type

triad.collections.dict.IndexedOrderedDict

equals(other, with_order)[source]#

Compare with another object

Parameters
Returns

whether they equal

get_item_by_index(index)[source]#

Get key value pair by index

Parameters

index (int) – index of the item

Return type

typing.Tuple[typing.TypeVar(KT), typing.TypeVar(VT)]

Returns

key value tuple at the index

get_key_by_index(index)[source]#

Get key by index

Parameters

index (int) – index of the key

Return type

typing.TypeVar(KT)

Returns

key value at the index

get_value_by_index(index)[source]#

Get value by index

Parameters

index (int) – index of the item

Return type

typing.TypeVar(VT)

Returns

value at the index

index_of_key(key)[source]#

Get index of key

Parameters

key (typing.Any) – key value

Return type

int

Returns

index of the key value

move_to_end(*args, **kwds)[source]#

Move an existing element to the end (or beginning if last is false).

Raise KeyError if the element does not exist.

Return type

None

pop(*args, **kwds)[source]#

value. If key is not found, d is returned if given, otherwise KeyError is raised.

Return type

typing.TypeVar(VT)

pop_by_index(index)[source]#

Pop item at index

Parameters

index (int) – index of the item

Return type

typing.Tuple[typing.TypeVar(KT), typing.TypeVar(VT)]

Returns

key value tuple at the index

popitem(*args, **kwds)[source]#

Remove and return a (key, value) pair from the dictionary.

Pairs are returned in LIFO order if last is true or FIFO order if false.

Return type

typing.Tuple[typing.TypeVar(KT), typing.TypeVar(VT)]

property readonly: bool#

Whether this dict is readonly

Return type

bool

set_readonly()[source]#

Make this dict readonly

Return type

None

set_value_by_index(index, value)[source]#

Set value by index

Parameters
Return type

None

class triad.collections.dict.ParamDict(data=None, deep=True)[source]#

Bases: triad.collections.dict.IndexedOrderedDict[str, Any]

Parameter dictionary, a subclass of IndexedOrderedDict, keys must be string

Parameters
IGNORE = 2#
OVERWRITE = 0#
THROW = 1#
get(key, default)[source]#

Get value by key, and the value must be a subtype of the type of default``(which can't be None). If the ``key is not found, return default.

Parameters

key (typing.Union[int, str]) – the key to search

Raises
Return type

typing.Any

Returns

the value by key, and the value must be a subtype of the type of default. If key is not found, return default

get_or_none(key, expected_type)[source]#

Get value by key, and the value must be a subtype of expected_type

Parameters
Raises

TypeError – if the value can’t be converted to expected_type

Return type

typing.Any

Returns

if key is not found, None. Otherwise if the value can be converted to expected_type, return the converted value, otherwise raise exception

get_or_throw(key, expected_type)[source]#

Get value by key, and the value must be a subtype of expected_type. If key is not found or value can’t be converted to expected_type, raise exception

Parameters
Raises
  • KeyError – if key is not found

  • TypeError – if the value can’t be converted to expected_type

Return type

typing.Any

Returns

only when key is found and can be converted to expected_type, return the converted value

to_json(indent=False)[source]#

Generate json expression string for the dictionary

Parameters

indent (bool) – whether to have indent

Return type

str

Returns

json string

update(other, on_dup=0, deep=True)[source]#

Update dictionary with another object (for possible types, see to_kv_iterable())

Parameters
  • other (typing.Any) – for possible types, see to_kv_iterable()

  • on_dup (int) – one of ParamDict.OVERWRITE, ParamDict.THROW and ParamDict.IGNORE

Raises
  • KeyError – if using ParamDict.THROW and other contains existing keys

  • ValueError – if on_dup is invalid

Return type

triad.collections.dict.ParamDict

Returns

itself

triad.collections.fs#

class triad.collections.fs.FileSystem(auto_close=True)[source]#

Bases: fs.mountfs.MountFS

A unified filesystem based on PyFileSystem2. The special requirement for this class is that all paths must be absolute path with scheme. To customize different file systems, you should override create_fs to provide your own configured file systems.

Examples

fs = FileSystem()
fs.writetext("mem://from/a.txt", "hello")
fs.copy("mem://from/a.txt", "mem://to/a.txt")

Note

If a path is not a local path, it must include the scheme and netloc (the first element after ://) :param auto_close: If True (the default), the child filesystems will be closed when MountFS is closed.

create_fs(root)[source]#

create a PyFileSystem instance from root. root is in the format of / if local path, else <scheme>://<netloc>. You should override this method to provide custom instances, for example, if you want to create an S3FS with certain parameters. :type root: str :param root: / if local path, else <scheme>://<netloc>

Return type

fs.base.FS

property glob#

A globber object

makedirs(path, permissions=None, recreate=False)[source]#

Make a directory, and any missing intermediate directories.

Note

This overrides the base makedirs

Parameters
Recreate

if False (the default), attempting to create an existing directory will raise an error. Set to True to ignore existing directories.

Return type

fs.subfs.SubFS

Returns

a sub-directory filesystem.

Raises

triad.collections.function_wrapper#

class triad.collections.function_wrapper.AnnotatedParam(param)[source]#

Bases: object

An abstraction of annotated parameter

class triad.collections.function_wrapper.FunctionWrapper(func, params_re='.*', return_re='.*')[source]#

Bases: object

Create a function wrapper that can recognize and validate all input types.

Parameters
  • func (typing.Callable) – the function to be wrapped

  • params_re (str) – paramter types regex expression

  • return_re (str) – return types regex expression

Examples

Here is a simple example to show how to use FunctionWrapper. Assuming we want to validate the functions with 2 pandas dataframes as the first two input and then arbitray other input, and with 1 pandas dataframe as the return

import pandas as pd

@function_wrapper(None)  # all param defintions are here, no entrypoint
class MyFuncWrapper(FunctionWrapper):
    def __init__(self, func):
        super().__init__(
            func,
            params_re="^dd.*",  # starts with two dataframe parameters
            return_re="^d$",  # returns a dataframe
        )

@MyFuncWrapper.annotated_param(pd.DataFrame, code="d")
class MyDataFrameParam(AnnotatedParam):
    pass

def f1(a:pd.DataFrame, b:pd.DataFrame, c) -> pd.DataFrame:
    return a

def f2(a, b:pd.DataFrame, c):
    return a

# f1 is valid
MyFuncWrapper(f1)

# f2 is invalid because of the first parameter
# TypeError will be thrown
MyFuncWrapper(f2)
classmethod annotated_param(annotation, code=None, matcher=None, child_can_reuse_code=False)[source]#

The decorator to register a type annotation for this function wrapper

Parameters
  • annotation (typing.Any) – the type annotation

  • code (typing.Optional[str]) – the single char code to represent this type annotation , defaults to None, meaning it will try to use its parent class’ code, this is allowed only if child_can_reuse_code is set to True on the parent class.

  • matcher (typing.Optional[typing.Callable[[typing.Any], bool]]) – a function taking in a type annotation and decide whether it is acceptable by the AnnotatedParam , defaults to None, meaning it will just do a simple == check.

  • child_can_reuse_code (bool) – whether the derived types of the current AnnotatedParam can reuse the code (if not specifying a new code) , defaults to False

property input_code: str#

The input parameters code expression

Return type

str

property output_code: str#

The output code expression

Return type

str

classmethod parse_annotation(annotation, param=None, none_as_other=True)[source]#
Return type

triad.collections.function_wrapper.AnnotatedParam

class triad.collections.function_wrapper.KeywordParam(param)[source]#

Bases: triad.collections.function_wrapper.AnnotatedParam

For keyword parameters

class triad.collections.function_wrapper.NoneParam(param)[source]#

Bases: triad.collections.function_wrapper.AnnotatedParam

The case where there is no annotation for a parameter

class triad.collections.function_wrapper.OtherParam(param)[source]#

Bases: triad.collections.function_wrapper.AnnotatedParam

Any annotation that is not recognized

class triad.collections.function_wrapper.PositionalParam(param)[source]#

Bases: triad.collections.function_wrapper.AnnotatedParam

For positional parameters

class triad.collections.function_wrapper.SelfParam(param)[source]#

Bases: triad.collections.function_wrapper.AnnotatedParam

For the self parameters in member functions

triad.collections.function_wrapper.function_wrapper(entrypoint)[source]#

The decorator to register a new FunctionWrapper type.

Parameters

entrypoint (typing.Optional[str]) – the entrypoint to load in setup.py in order to find the registered AnnotatedParam under this FunctionWrapper

triad.collections.schema#

class triad.collections.schema.Schema(*args, **kwargs)[source]#

Bases: triad.collections.dict.IndexedOrderedDict[str, pyarrow.lib.Field]

A Schema wrapper on top of pyarrow.Fields. This has more features than pyarrow.Schema, and they can convert to each other.

This class can be initialized from schema like objects. Here is a list of schema like objects:

  • pyarrow.Schema or Schema objects

  • pyarrow.Field: single field will be treated as a single column schema

  • schema expressions: expression_to_schema()

  • Dict[str,Any]: key will be the columns, and value will be type like objects

  • Tuple[str,Any]: first item will be the only column name of the schema, and the second has to be a type like object

  • List[Any]: a list of Schema like objects

  • pandas.DataFrame: it will extract the dataframe’s schema

Here is a list of data type like objects:

  • pyarrow.DataType

  • pyarrow.Field: will only use the type attribute of the field

  • type expression or other objects: for to_pa_datatype()

Examples

Schema("a:int,b:int")
Schema("a:int","b:int")
Schema(a=int,b=str) # == Schema("a:long,b:str")
Schema(dict(a=int,b=str)) # == Schema("a:long,b:str")
Schema([(a,int),(b,str)]) # == Schema("a:long,b:str")
Schema((a,int),(b,str)) # == Schema("a:long,b:str")
Schema("a:[int],b:{x:int,y:{z:[str],w:byte}},c:[{x:str}]")

Note

  • For supported pyarrow.DataTypes see is_supported()

  • If you use python type as data type (e.g. Schema(a=int,b=str)) be aware the data type different. (e.g. python int type -> pyarrow long/int64 type)

  • When not readonly, only append is allowed, update or remove are disallowed

  • When readonly, no modification on the existing schema is allowed

  • append, update and remove are always allowed when creating a new object

  • InvalidOperationError will be raised for disallowed operations

  • At most one of *args and **kwargs can be set

Parameters
  • args (typing.Any) – one or multiple schema like objects, which will be combined in order

  • kwargs (typing.Any) – key value pairs for the schema

append(obj)[source]#

Append schema like object to the current schema. Only new columns are allowed.

Raises

SchemaError – if a column exists or is invalid or obj is not convertible

Return type

triad.collections.schema.Schema

Returns

the Schema object itself

assert_not_empty()[source]#
Return type

triad.collections.schema.Schema

copy()[source]#

Clone Schema object

Return type

triad.collections.schema.Schema

Returns

cloned object

exclude(other, require_type_match=True, ignore_type_mismatch=False)[source]#
Return type

triad.collections.schema.Schema

extract(obj, ignore_key_mismatch=False, require_type_match=True, ignore_type_mismatch=False)[source]#
Return type

triad.collections.schema.Schema

property fields: List[pyarrow.lib.Field]#

List of pyarrow.Fields

Return type

typing.List[pyarrow.lib.Field]

intersect(other, require_type_match=True, ignore_type_mismatch=True, use_other_order=False)[source]#
Return type

triad.collections.schema.Schema

property names: List[str]#

List of column names

Return type

typing.List[str]

property pa_schema: pyarrow.lib.Schema#

convert as pyarrow.Schema

Return type

pyarrow.lib.Schema

property pandas_dtype: Dict[str, numpy.dtype]#

convert as dtype dict for pandas dataframes. Currently, struct type is not supported

Return type

typing.Dict[str, numpy.dtype]

property pd_dtype: Dict[str, numpy.dtype]#

convert as dtype dict for pandas dataframes. Currently, struct type is not supported

Return type

typing.Dict[str, numpy.dtype]

property pyarrow_schema: pyarrow.lib.Schema#

convert as pyarrow.Schema

Return type

pyarrow.lib.Schema

remove(obj, ignore_key_mismatch=False, require_type_match=True, ignore_type_mismatch=False)[source]#
Return type

triad.collections.schema.Schema

rename(columns, ignore_missing=False)[source]#

Rename the current schema and generate a new one

Parameters

columns (typing.Dict[str, str]) – dictionary to map from old to new column names

Return type

triad.collections.schema.Schema

Returns

renamed schema object

transform(*args, **kwargs)[source]#

Transform the current schema to a new schema

Raises

SchemaError – if there is any exception

Return type

triad.collections.schema.Schema

Returns

transformed schema

Examples

s=Schema("a:int,b:int,c:str")
s.transform("x:str") # x:str
# add
s.transform("*,x:str") # a:int,b:int,c:str,x:str
s.transform("*","x:str") # a:int,b:int,c:str,x:str
s.transform("*",x=str) # a:int,b:int,c:str,x:str
# subtract
s.transform("*-c,a") # b:int
s.transform("*-c-a") # b:int
s.transform("*~c,a,x") # b:int  # ~ means exlcude if exists
s.transform("*~c~a~x") # b:int  # ~ means exlcude if exists
# + means overwrite existing and append new
s.transform("*+e:str,b:str,d:str") # a:int,b:str,c:str,e:str,d:str
# you can have multiple operations
s.transform("*+b:str-a") # b:str,c:str
# callable
s.transform(lambda s:s.fields[0]) # a:int
s.transform(lambda s:s.fields[0], lambda s:s.fields[2]) # a:int,c:str
property types: List[pyarrow.lib.DataType]#

List of pyarrow.DataTypes

Return type

typing.List[pyarrow.lib.DataType]

union(other, require_type_match=False)[source]#
Return type

triad.collections.schema.Schema

union_with(other, require_type_match=False)[source]#
Return type

triad.collections.schema.Schema

exception triad.collections.schema.SchemaError(message)[source]#

Bases: Exception

Exceptions related with construction and modifying schemas