Source code for triad.utils.threading

from functools import _make_key, wraps
from threading import RLock
from typing import Any, Callable, Dict, Optional, Tuple, Type


[docs] class SerializableRLock: """A serialization safe wrapper of :external+python:class:`threading.RLock`""" def __init__(self): self._lock = RLock() def __enter__(self) -> Any: return self._lock.__enter__() def __exit__( self, exception_type: Any, exception_value: Any, exception_traceback: Any ) -> Any: return self._lock.__exit__(exception_type, exception_value, exception_traceback) def __getstate__(self) -> Dict[str, Any]: return {} def __setstate__(self, data: Dict[str, Any]) -> None: self._lock = RLock()
[docs] def run_once( func: Optional[Callable] = None, key_func: Optional[Callable] = None, lock_type: Type = RLock, ) -> Callable: """The decorator to run `func` once, the uniqueness is defined by `key_func`. This implementation is serialization safe and thread safe. :param func: the function to run only once with this wrapper instance :param key_func: the unique key determined by arguments of `func`, if not set, it will use the same hasing logic as :external+python:func:`functools.lru_cache` :param lock_type: lock class type for thread safe, it doesn't need to be serialization safe .. admonition:: Examples .. code-block:: python @run_once def r(a): return max(a) a1 = [0, 1] a2 = [0, 2] assert 1 == r(a1) # will trigger r assert 1 == r(a1) # will get the result from cache assert 2 == r(a2) # will trigger r again because of different arguments # the following example ignores arguments @run_once(key_func=lambda *args, **kwargs: True) def r2(a): return max(a) assert 1 == r(a1) # will trigger r assert 1 == r(a2) # will get the result from cache .. note:: * Hash collision is the concern of the user, not this class, your `key_func` should avoid any potential collision * `func` can have no return * For concurrent calls of this wrapper, only one will trigger `func` other calls will be blocked until the first call returns an result * This class is cloudpicklable, but unpickled instance does NOT share the same context with the original one * This is not to replace :external+python:func:`functools.lru_cache`, it is not supposed to cache a lot of items """ def _run(func: Callable) -> "RunOnce": return wraps(func)(RunOnce(func, key_func=key_func, lock_type=lock_type)) return _run(func) if func is not None else wraps(func)(_run) # type: ignore
[docs] class RunOnce: """Run `func` once, the uniqueness is defined by `key_func`. This implementation is serialization safe and thread safe. .. note:: Please use the decorator :func:`~.run_once` instead of directly using this class :param func: the function to run only once with this wrapper instance :param key_func: the unique key determined by arguments of `func`, if not set, it will use the same hasing logic as :external+python:func:`functools.lru_cache` :param lock_type: lock class type for thread safe """ def __init__( self, func: Callable, key_func: Optional[Callable] = None, lock_type: Type = RLock, ): self._func = func if key_func is None: self._key_func: Callable = lambda *args, **kwargs: _make_key( args, kwargs, typed=True ) else: self._key_func = key_func self._lock_type = lock_type self._init_locks() def __getstate__(self) -> Dict[str, Any]: d = dict(self.__dict__) del d["_lock"] del d["_locks"] del d["_store"] return d def __setstate__(self, members: Any) -> None: self.__dict__.update(members) self._init_locks() def __call__(self, *args: Any, **kwargs: Any) -> Any: key = self._key_func(*args, **kwargs) lock = self._get_lock(key) with lock: found, res = self._try_get(key) if found: return res res = self._func(*args, **kwargs) self._update(key, res) return res def _get_lock(self, key) -> Any: with self._lock: if key not in self._locks: self._locks[key] = self._lock_type() return self._locks[key] def _try_get(self, key) -> Tuple[bool, Any]: with self._lock: if key in self._store: return True, self._store[key] return False, None def _update(self, key, value) -> None: with self._lock: self._store[key] = value def _init_locks(self): self._lock = self._lock_type() self._locks: Dict[int, Any] = {} self._store: Dict[int, Any] = {}