from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Optional,
Set,
Tuple,
TypeVar,
Union,
)
from triad.utils.assertion import assert_or_throw
from triad.utils.convert import to_size
T = TypeVar("T")
[docs]def make_empty_aware(it: Union[Iterable[T], Iterator[T]]) -> "EmptyAwareIterable[T]":
"""Make an iterable empty aware, or return itself if already empty aware
:param it: underlying iterable
:return: EmptyAwareIterable[T]
"""
return it if isinstance(it, EmptyAwareIterable) else EmptyAwareIterable(it)
[docs]def slice_iterable(
it: Union[Iterable[T], Iterator[T]], slicer: Callable[[int, T, Optional[T]], bool]
) -> "Iterable[_SliceIterable[T]]":
"""Slice the original iterable into slices by slicer
:param it: underlying iterable
:param slicer: taking in current number, current value, last value,
it decides if it's a new slice
:yield: an iterable of iterables (_SliceIterable[T])
"""
si = _SliceIterable(it, slicer)
while si._state < 3:
yield si
si.recycle()
[docs]def to_kv_iterable( # noqa: C901
data: Any, none_as_empty: bool = True
) -> Iterable[Tuple[Any, Any]]:
"""Convert data to iterable of key value pairs
:param data: input object, it can be a dict or Iterable[Tuple[Any, Any]]
or Iterable[List[Any]]
:param none_as_empty: if to treat None as empty iterable
:raises ValueError: if input is None and `none_as_empty==False`
:raises ValueError: if input is a set
:raises TypeError or ValueError: if input data type is not acceptable
:yield: iterable of key value pair as tuples
"""
if data is None:
assert_or_throw(none_as_empty, ValueError("data can't be None"))
elif isinstance(data, Dict):
for k, v in data.items():
yield k, v
elif isinstance(data, Set):
raise ValueError(f"{data} is a set, did you mistakenly use `,` instead of `:`?")
elif isinstance(data, Iterable):
ei = make_empty_aware(data)
if not ei.empty:
first = ei.peek()
if isinstance(first, tuple):
for k, v in ei:
yield k, v
elif isinstance(first, List):
for arr in ei:
if len(arr) == 2:
yield arr[0], arr[1]
else:
raise TypeError(f"{arr} is not an acceptable item")
else:
raise TypeError(f"{first} is not an acceptable item")
else:
raise TypeError(f"{type(data)} is not supported")
[docs]class EmptyAwareIterable(Iterable[T]):
"""A wrapper of iterable that can tell if the underlying
iterable is empty, it can also peek a non-empty iterable.
:param it: the underlying iterable
:raises StopIteration: raised by the underlying iterable
"""
def __init__(self, it: Union[Iterable[T], Iterator[T]]):
self._last: Optional[T] = None
if not isinstance(it, Iterator):
self._iter = iter(it)
else:
self._iter = it
self._state = 0
self._fill_last()
@property
def empty(self) -> bool:
"""Check if the underlying iterable has more items
:return: whether it is empty
"""
return self._fill_last() >= 2
[docs] def peek(self) -> T:
"""Return the next of the iterable without moving
:raises StopIteration: if it's empty
:return: the `next` item
"""
if not self.empty:
return self._last # type: ignore
raise StopIteration("Can't peek empty iterable")
def __iter__(self) -> Any:
"""Wrapper of the underlying __iter__
:yield: next object
"""
while self._fill_last() < 2:
self._state = 0
yield self._last
def _fill_last(self) -> int:
try:
if self._state == 0: # last is not filled
self._last = next(self._iter)
self._state = 1 # last filed not used
except StopIteration:
self._state = 3 # end
return self._state
[docs]class Slicer:
"""A better version of :func:`~triad.iter.slice_iterable`
:param sizer: the function to get size of an item
:param row_limit: max row for each slice, defaults to None
:param size_limit: max byte size for each slice, defaults to None
:param slicer: taking in current number, current value, last value,
it decides if it's a new slice
:raises AssertionError: if `size_limit` is not None but `sizer` is None
"""
def __init__(
self,
sizer: Optional[Callable[[Any], int]] = None, # func for getsizeof(item)
row_limit: Optional[int] = None,
size_limit: Any = None,
slicer: Optional[Callable[[int, T, Optional[T]], bool]] = None,
) -> None:
self._sizer = sizer
self._slicer = slicer
if row_limit is None:
self._row_limit = 0
else:
self._row_limit = row_limit
if size_limit is None:
self._size_limit = 0
else:
self._size_limit = to_size(str(size_limit))
assert (
self._size_limit == 0 or self._sizer is not None
), "sizer must be set when size_limit>0"
self._current_row = 1
self._current_size = 0
[docs] def slice( # noqa C901
self, orig_it: Iterable[T]
) -> Iterable[EmptyAwareIterable[T]]:
"""Slice the original iterable into slices by the combined slicing logic
:param orig_it: ther original iterable
:yield: an iterable of EmptyAwareIterable
"""
it = make_empty_aware(orig_it)
if it.empty:
pass
elif self._row_limit <= 0 and self._size_limit <= 0:
if self._slicer is None:
yield it
else:
for _slice in slice_iterable(it, self._slicer):
yield _slice
elif self._row_limit > 0 and self._size_limit <= 0:
if self._slicer is None:
for _slice in slice_iterable(it, self._is_boundary_row_only):
yield _slice
else:
for _slice in slice_iterable(it, self._is_boundary_row_only_w_slicer):
yield _slice
else:
self._current_size = self._sizer(it.peek()) # type: ignore
self._current_row = 1
if self._row_limit <= 0 and self._size_limit > 0:
for _slice in slice_iterable(it, self._is_boundary_size_only):
yield _slice
else:
for _slice in slice_iterable(it, self._is_boundary):
yield _slice
def _is_boundary_row_only(self, no: int, current: Any, last: Any) -> bool:
return no % self._row_limit == 0
def _is_boundary_row_only_w_slicer(self, no: int, current: Any, last: Any) -> bool:
# self._slicer must be invoked even hitting row limit
is_boundary = self._slicer is not None and self._slicer(no, current, last)
if self._current_row >= self._row_limit or is_boundary:
self._current_row = 1
return True
self._current_row += 1
return False
def _is_boundary_size_only(self, no: int, current: Any, last: Any) -> bool:
obj_size = self._sizer(current) # type: ignore
next_size = self._current_size + obj_size
# self._slicer must be invoked even hitting size limit
is_boundary = self._slicer is not None and self._slicer(no, current, last)
if next_size > self._size_limit or is_boundary:
self._current_size = obj_size
return True
self._current_size = next_size
return False
def _is_boundary(self, no: int, current: Any, last: Any) -> bool:
obj_size = self._sizer(current) # type: ignore
next_size = self._current_size + obj_size
# self._slicer must be invoked even hitting row limit and size limit
is_boundary = self._slicer is not None and self._slicer(no, current, last)
if (
next_size > self._size_limit
or self._current_row >= self._row_limit # noqa: W503
or is_boundary # noqa: W503
):
self._current_size = obj_size
self._current_row = 1
return True
self._current_size = next_size
self._current_row += 1
return False
class _SliceIterable(EmptyAwareIterable[T]):
def __init__(self, it: Union[Iterable[T], Iterator[T]], slicer: Any):
self._n = 0
self._slicer = slicer
super().__init__(it)
def recycle(self) -> None:
if self._state < 2:
for _ in self:
pass
if self._state == 2:
self._state = 1
def _fill_last(self) -> int:
try:
if self._state == 0: # last is not filled
last = self._last
self._last = next(self._iter)
is_boundary = self._n > 0 and self._slicer(self._n, self._last, last)
self._n += 1
self._state = 2 if is_boundary else 1 # last filed not used
except StopIteration:
self._state = 3 # end
return self._state