triad.collections

triad.collections.dict

class triad.collections.dict.IndexedOrderedDict(*args, **kwds)[source]

Bases: OrderedDict, Dict[KT, VT]

Subclass of OrderedDict that can get and set with index

clear()[source]
Return type:

None

copy()[source]
Return type:

triad.collections.dict.IndexedOrderedDict

equals(other, with_order)[source]

Compare with another object

Parameters:
Returns:

whether they equal

get_item_by_index(index)[source]

Get key value pair by index

Parameters:

index (int) – index of the item

Return type:

typing.Tuple[typing.TypeVar(KT), typing.TypeVar(VT)]

Returns:

key value tuple at the index

get_key_by_index(index)[source]

Get key by index

Parameters:

index (int) – index of the key

Return type:

typing.TypeVar(KT)

Returns:

key value at the index

get_value_by_index(index)[source]

Get value by index

Parameters:

index (int) – index of the item

Return type:

typing.TypeVar(VT)

Returns:

value at the index

index_of_key(key)[source]

Get index of key

Parameters:

key (typing.Any) – key value

Return type:

int

Returns:

index of the key value

move_to_end(*args, **kwds)[source]

Move an existing element to the end (or beginning if last is false).

Raise KeyError if the element does not exist.

Return type:

None

pop(*args, **kwds)[source]

If the key is not found, return the default if given; otherwise, raise a KeyError.

Return type:

typing.TypeVar(VT)

pop_by_index(index)[source]

Pop item at index

Parameters:

index (int) – index of the item

Return type:

typing.Tuple[typing.TypeVar(KT), typing.TypeVar(VT)]

Returns:

key value tuple at the index

popitem(*args, **kwds)[source]

Remove and return a (key, value) pair from the dictionary.

Pairs are returned in LIFO order if last is true or FIFO order if false.

Return type:

typing.Tuple[typing.TypeVar(KT), typing.TypeVar(VT)]

property readonly: bool

Whether this dict is readonly

set_readonly()[source]

Make this dict readonly

Return type:

None

set_value_by_index(index, value)[source]

Set value by index

Parameters:
Return type:

None

class triad.collections.dict.ParamDict(data=None, deep=True)[source]

Bases: IndexedOrderedDict[str, Any]

Parameter dictionary, a subclass of IndexedOrderedDict, keys must be string

Parameters:
IGNORE = 2
OVERWRITE = 0
THROW = 1
get(key, default)[source]

Get value by key, and the value must be a subtype of the type of default``(which can't be None). If the ``key is not found, return default.

Parameters:

key (typing.Union[int, str]) – the key to search

Raises:
Return type:

typing.Any

Returns:

the value by key, and the value must be a subtype of the type of default. If key is not found, return default

get_or_none(key, expected_type)[source]

Get value by key, and the value must be a subtype of expected_type

Parameters:
Raises:

TypeError – if the value can’t be converted to expected_type

Return type:

typing.Any

Returns:

if key is not found, None. Otherwise if the value can be converted to expected_type, return the converted value, otherwise raise exception

get_or_throw(key, expected_type)[source]

Get value by key, and the value must be a subtype of expected_type. If key is not found or value can’t be converted to expected_type, raise exception

Parameters:
Raises:
  • KeyError – if key is not found

  • TypeError – if the value can’t be converted to expected_type

Return type:

typing.Any

Returns:

only when key is found and can be converted to expected_type, return the converted value

to_json(indent=False)[source]

Generate json expression string for the dictionary

Parameters:

indent (bool) – whether to have indent

Return type:

str

Returns:

json string

update(other, on_dup=0, deep=True)[source]

Update dictionary with another object (for possible types, see to_kv_iterable())

Parameters:
  • other (typing.Any) – for possible types, see to_kv_iterable()

  • on_dup (int) – one of ParamDict.OVERWRITE, ParamDict.THROW and ParamDict.IGNORE

Raises:
  • KeyError – if using ParamDict.THROW and other contains existing keys

  • ValueError – if on_dup is invalid

Return type:

triad.collections.dict.ParamDict

Returns:

itself

triad.collections.fs

class triad.collections.fs.FileSystem(auto_close=True)[source]

Bases: MountFS

A unified filesystem based on PyFileSystem2. The special requirement for this class is that all paths must be absolute path with scheme. To customize different file systems, you should override create_fs to provide your own configured file systems.

Parameters:

auto_close (bool) – If True (the default), the child filesystems will be closed when MountFS is closed.

Examples

fs = FileSystem()
fs.writetext("mem://from/a.txt", "hello")
fs.copy("mem://from/a.txt", "mem://to/a.txt")

Note

If a path is not a local path, it must include the scheme and netloc (the first element after ://)

create_fs(root)[source]

create a PyFileSystem instance from root. root is in the format of / if local path, else <scheme>://<netloc>. You should override this method to provide custom instances, for example, if you want to create an S3FS with certain parameters. :type root: str :param root: / if local path, else <scheme>://<netloc>

Return type:

fs.base.FS

property glob

A globber object

makedirs(path, permissions=None, recreate=False)[source]

Make a directory, and any missing intermediate directories.

Note

This overrides the base makedirs

Parameters:
Recreate:

if False (the default), attempting to create an existing directory will raise an error. Set to True to ignore existing directories.

Return type:

fs.subfs.SubFS

Returns:

a sub-directory filesystem.

Raises:

triad.collections.function_wrapper

class triad.collections.function_wrapper.AnnotatedParam(param)[source]

Bases: object

An abstraction of annotated parameter

class triad.collections.function_wrapper.FunctionWrapper(func, params_re='.*', return_re='.*')[source]

Bases: object

Create a function wrapper that can recognize and validate all input types.

Parameters:
  • func (typing.Callable) – the function to be wrapped

  • params_re (str) – paramter types regex expression

  • return_re (str) – return types regex expression

Examples

Here is a simple example to show how to use FunctionWrapper. Assuming we want to validate the functions with 2 pandas dataframes as the first two input and then arbitray other input, and with 1 pandas dataframe as the return

import pandas as pd

@function_wrapper(None)  # all param defintions are here, no entrypoint
class MyFuncWrapper(FunctionWrapper):
    def __init__(self, func):
        super().__init__(
            func,
            params_re="^dd.*",  # starts with two dataframe parameters
            return_re="^d$",  # returns a dataframe
        )

@MyFuncWrapper.annotated_param(pd.DataFrame, code="d")
class MyDataFrameParam(AnnotatedParam):
    pass

def f1(a:pd.DataFrame, b:pd.DataFrame, c) -> pd.DataFrame:
    return a

def f2(a, b:pd.DataFrame, c):
    return a

# f1 is valid
MyFuncWrapper(f1)

# f2 is invalid because of the first parameter
# TypeError will be thrown
MyFuncWrapper(f2)
classmethod annotated_param(annotation, code=None, matcher=None, child_can_reuse_code=False)[source]

The decorator to register a type annotation for this function wrapper

Parameters:
  • annotation (typing.Any) – the type annotation

  • code (typing.Optional[str]) – the single char code to represent this type annotation , defaults to None, meaning it will try to use its parent class’ code, this is allowed only if child_can_reuse_code is set to True on the parent class.

  • matcher (typing.Optional[typing.Callable[[typing.Any], bool]]) – a function taking in a type annotation and decide whether it is acceptable by the AnnotatedParam , defaults to None, meaning it will just do a simple == check.

  • child_can_reuse_code (bool) – whether the derived types of the current AnnotatedParam can reuse the code (if not specifying a new code) , defaults to False

property input_code: str

The input parameters code expression

property output_code: str

The output code expression

classmethod parse_annotation(annotation, param=None, none_as_other=True)[source]
Return type:

triad.collections.function_wrapper.AnnotatedParam

class triad.collections.function_wrapper.KeywordParam(param)[source]

Bases: AnnotatedParam

For keyword parameters

class triad.collections.function_wrapper.NoneParam(param)[source]

Bases: AnnotatedParam

The case where there is no annotation for a parameter

class triad.collections.function_wrapper.OtherParam(param)[source]

Bases: AnnotatedParam

Any annotation that is not recognized

class triad.collections.function_wrapper.PositionalParam(param)[source]

Bases: AnnotatedParam

For positional parameters

class triad.collections.function_wrapper.SelfParam(param)[source]

Bases: AnnotatedParam

For the self parameters in member functions

triad.collections.function_wrapper.function_wrapper(entrypoint)[source]

The decorator to register a new FunctionWrapper type.

Parameters:

entrypoint (typing.Optional[str]) – the entrypoint to load in setup.py in order to find the registered AnnotatedParam under this FunctionWrapper

triad.collections.schema

class triad.collections.schema.Schema(*args, **kwargs)[source]

Bases: IndexedOrderedDict[str, Field]

A Schema wrapper on top of pyarrow.Fields. This has more features than pyarrow.Schema, and they can convert to each other.

This class can be initialized from schema like objects. Here is a list of schema like objects:

  • pyarrow.Schema or Schema objects

  • pyarrow.Field: single field will be treated as a single column schema

  • schema expressions: expression_to_schema()

  • Dict[str,Any]: key will be the columns, and value will be type like objects

  • Tuple[str,Any]: first item will be the only column name of the schema, and the second has to be a type like object

  • List[Any]: a list of Schema like objects

  • pandas.DataFrame: it will extract the dataframe’s schema

Here is a list of data type like objects:

  • pyarrow.DataType

  • pyarrow.Field: will only use the type attribute of the field

  • type expression or other objects: for to_pa_datatype()

Examples

Schema("a:int,b:int")
Schema("a:int","b:int")
Schema(a=int,b=str) # == Schema("a:long,b:str")
Schema(dict(a=int,b=str)) # == Schema("a:long,b:str")
Schema([(a,int),(b,str)]) # == Schema("a:long,b:str")
Schema((a,int),(b,str)) # == Schema("a:long,b:str")
Schema("a:[int],b:{x:int,y:{z:[str],w:byte}},c:[{x:str}]")

Note

  • For supported pyarrow.DataTypes see is_supported()

  • If you use python type as data type (e.g. Schema(a=int,b=str)) be aware the data type different. (e.g. python int type -> pyarrow long/int64 type)

  • When not readonly, only append is allowed, update or remove are disallowed

  • When readonly, no modification on the existing schema is allowed

  • append, update and remove are always allowed when creating a new object

  • InvalidOperationError will be raised for disallowed operations

  • At most one of *args and **kwargs can be set

Parameters:
  • args (typing.Any) – one or multiple schema like objects, which will be combined in order

  • kwargs (typing.Any) – key value pairs for the schema

alter(subschema)[source]

Alter the schema with a subschema

Parameters:

subschema (typing.Any) – a schema like object

Return type:

triad.collections.schema.Schema

Returns:

the altered schema

append(obj)[source]

Append schema like object to the current schema. Only new columns are allowed.

Raises:

SchemaError – if a column exists or is invalid or obj is not convertible

Return type:

triad.collections.schema.Schema

Returns:

the Schema object itself

assert_not_empty()[source]

Raise exception if schema is empty

Return type:

triad.collections.schema.Schema

copy()[source]

Clone Schema object

Return type:

triad.collections.schema.Schema

Returns:

cloned object

create_empty_arrow_table()[source]

Create an empty pyarrow table based on the schema

Return type:

pyarrow.lib.Table

create_empty_pandas_df(use_extension_types=False, use_arrow_dtype=False)[source]

Create an empty pandas dataframe based on the schema

Parameters:
  • use_extension_types (bool) – if True, use pandas extension types, default False

  • use_arrow_dtype (bool) – if True and when pandas supports ArrowDType, use pyarrow types, default False

Return type:

pandas.core.frame.DataFrame

Returns:

empty pandas dataframe

exclude(other, require_type_match=True, ignore_type_mismatch=False)[source]

Exclude columns from the current schema which are also in other. other can contain columns that are not in the current schema, they will be ignored.

Parameters:
  • other (typing.Any) – one column name, a list/set of column names or a schema like object

  • require_type_match (bool) – if True, a match requires the same key and same type (if obj contains type), otherwise, only the key needs to match, default True

  • ignore_type_mismatch (bool) – if False, when keys match but types don’t (if obj contains type), raise an exception SchemaError, default False

Return type:

triad.collections.schema.Schema

Returns:

a schema excluding the columns in other

extract(obj, ignore_key_mismatch=False, require_type_match=True, ignore_type_mismatch=False)[source]

Extract a sub schema from the schema based on the columns in obj

Parameters:
  • obj (typing.Any) – one column name, a list/set of column names or a schema like object

  • ignore_key_mismatch (bool) – if True, ignore the non-existing keys, default False

  • require_type_match (bool) – if True, a match requires the same key and same type (if obj contains type), otherwise, only the key needs to match, default True

  • ignore_type_mismatch (bool) – if False, when keys match but types don’t (if obj contains type), raise an exception SchemaError, default False

Return type:

triad.collections.schema.Schema

Returns:

a sub-schema containing the columns in obj

property fields: List[Field]

List of pyarrow.Fields

intersect(other, require_type_match=True, ignore_type_mismatch=True, use_other_order=False)[source]

Extract the sub-schema from the current schema which are also in other. other can contain columns that are not in the current schema, they will be ignored.

Parameters:
  • other (typing.Any) – one column name, a list/set of column names or a schema like object

  • require_type_match (bool) – if True, a match requires the same key and same type (if obj contains type), otherwise, only the key needs to match, default True

  • ignore_type_mismatch (bool) – if False, when keys match but types don’t (if obj contains type), raise an exception SchemaError, default False

  • use_other_order (bool) – if True, the output schema will use the column order of other, default False

Return type:

triad.collections.schema.Schema

Returns:

the intersected schema

is_like(other, equal_groups=None)[source]

Check if the two schemas are equal or similar

Parameters:
Return type:

bool

Returns:

True if the two schemas are equal

Examples

s = Schema("a:int,b:str")
assert s.is_like("a:int,b:str")
assert not s.is_like("a:long,b:str")
assert s.is_like("a:long,b:str", equal_groups=[(pa.types.is_integer,)])
property names: List[str]

List of column names

property pa_schema: Schema

convert as pyarrow.Schema

property pandas_dtype: Dict[str, dtype]

Convert as dtype dict for pandas dataframes. Currently, struct type is not supported

property pd_dtype: Dict[str, dtype]

convert as dtype dict for pandas dataframes. Currently, struct type is not supported

property pyarrow_schema: Schema

convert as pyarrow.Schema

remove(obj, ignore_key_mismatch=False, require_type_match=True, ignore_type_mismatch=False)[source]

Remove columns or schema from the schema

Parameters:
  • obj (typing.Any) – one column name, a list/set of column names or a schema like object

  • ignore_key_mismatch (bool) – if True, ignore the non-existing keys, default False

  • require_type_match (bool) – if True, a match requires the same key and same type (if obj contains type), otherwise, only the key needs to match, default True

  • ignore_type_mismatch (bool) – if False, when keys match but types don’t (if obj contains type), raise an exception SchemaError, default False

Return type:

triad.collections.schema.Schema

Returns:

a schema excluding the columns in obj

rename(columns, ignore_missing=False)[source]

Rename the current schema and generate a new one

Parameters:

columns (typing.Dict[str, str]) – dictionary to map from old to new column names

Return type:

triad.collections.schema.Schema

Returns:

renamed schema object

to_pandas_dtype(use_extension_types=False, use_arrow_dtype=False)[source]

Convert as dtype dict for pandas dataframes.

Parameters:
  • use_extension_types (bool) – if True, use pandas extension types, default False

  • use_arrow_dtype (bool) – if True and when pandas supports ArrowDType, use pyarrow types, default False

Return type:

typing.Dict[str, numpy.dtype]

Note

  • If use_extension_types is False and use_arrow_dtype is True,

    it converts all types to ArrowDType

  • If both are true, it converts types to the numpy backend nullable

    dtypes if possible, otherwise, it converts to ArrowDType

transform(*args, **kwargs)[source]

Transform the current schema to a new schema

Raises:

SchemaError – if there is any exception

Return type:

triad.collections.schema.Schema

Returns:

transformed schema

Examples

s=Schema("a:int,b:int,c:str")
s.transform("x:str") # x:str
# add
s.transform("*,x:str") # a:int,b:int,c:str,x:str
s.transform("*","x:str") # a:int,b:int,c:str,x:str
s.transform("*",x=str) # a:int,b:int,c:str,x:str
# subtract
s.transform("*-c,a") # b:int
s.transform("*-c-a") # b:int
s.transform("*~c,a,x") # b:int  # ~ means exlcude if exists
s.transform("*~c~a~x") # b:int  # ~ means exlcude if exists
# + means overwrite existing and append new
s.transform("*+e:str,b:str,d:str") # a:int,b:str,c:str,e:str,d:str
# you can have multiple operations
s.transform("*+b:str-a") # b:str,c:str
# callable
s.transform(lambda s:s.fields[0]) # a:int
s.transform(lambda s:s.fields[0], lambda s:s.fields[2]) # a:int,c:str
property types: List[DataType]

List of pyarrow.DataTypes

union(other, require_type_match=False)[source]

Union the other schema

Parameters:
  • other (typing.Any) – a schema like object

  • require_type_match (bool) – if True, a match requires the same key and same type (if obj contains type), otherwise, only the key needs to match, default True

Return type:

triad.collections.schema.Schema

Returns:

the new unioned schema

union_with(other, require_type_match=False)[source]

Union the other schema into the current schema

Parameters:
  • other (typing.Any) – a schema like object

  • require_type_match (bool) – if True, a match requires the same key and same type (if obj contains type), otherwise, only the key needs to match, default True

Return type:

triad.collections.schema.Schema

Returns:

the current schema

exception triad.collections.schema.SchemaError(message)[source]

Bases: Exception

Exceptions related with construction and modifying schemas