Source code for triad.collections.fs

import re
from threading import RLock
from typing import Any, Dict, Tuple
from urllib.parse import urlparse

from triad.utils.hash import to_uuid

import fs
from fs import memoryfs, open_fs, tempfs
from fs.base import FS as FSBase
from fs.glob import BoundGlobber, Globber
from fs.mountfs import MountFS
from fs.subfs import SubFS

_SCHEME_PREFIX = re.compile(r"^[a-zA-Z0-9\-_]+:")

[docs]class FileSystem(MountFS): """A unified filesystem based on PyFileSystem2. The special requirement for this class is that all paths must be absolute path with scheme. To customize different file systems, you should override `create_fs` to provide your own configured file systems. :param auto_close: If `True` (the default), the child filesystems will be closed when `MountFS` is closed. .. admonition:: Examples .. code-block:: python fs = FileSystem() fs.writetext("mem://from/a.txt", "hello") fs.copy("mem://from/a.txt", "mem://to/a.txt") .. note:: If a path is not a local path, it must include the scheme and `netloc` (the first element after `://`) """ def __init__(self, auto_close: bool = True): super().__init__(auto_close) self._fs_store: Dict[str, FSBase] = {} self._in_create = False self._fs_lock = RLock()
[docs] def create_fs(self, root: str) -> FSBase: """create a PyFileSystem instance from `root`. `root` is in the format of `/` if local path, else `<scheme>://<netloc>`. You should override this method to provide custom instances, for example, if you want to create an S3FS with certain parameters. :param root: `/` if local path, else `<scheme>://<netloc>` """ if root.startswith("temp://"): fs = tempfs.TempFS(root[len("temp://") :]) return fs if root.startswith("mem://"): fs = memoryfs.MemoryFS() return fs return open_fs(root)
@property def glob(self): """A globber object""" return _BoundGlobber(self) def _delegate(self, path) -> Tuple[FSBase, str]: with self._fs_lock: if self._in_create: # pragma: no cover return super()._delegate(path) self._in_create = True fp = _FSPath(path) if fp.root not in self._fs_store: self._fs_store[fp.root] = self.create_fs(fp.root) self.mount(to_uuid(fp.root), self._fs_store[fp.root]) self._in_create = False m_path = to_uuid(fp.root) + "/" + fp.relative_path return super()._delegate(m_path)
[docs] def makedirs( self, path: str, permissions: Any = None, recreate: bool = False ) -> SubFS: """Make a directory, and any missing intermediate directories. .. note:: This overrides the base ``makedirs`` :param path: path to directory from root. :param permissions: initial permissions, or `None` to use defaults. :recreate: if `False` (the default), attempting to create an existing directory will raise an error. Set to `True` to ignore existing directories. :return: a sub-directory filesystem. :raises fs.errors.DirectoryExists: if the path is already a directory, and ``recreate`` is `False`. :raises fs.errors.DirectoryExpected: if one of the ancestors in the path is not a directory. """ self.check() fs, _path = self._delegate(path) return fs.makedirs(_path, permissions=permissions, recreate=recreate)
class _BoundGlobber(BoundGlobber): def __call__( self, pattern: Any, path: str = "/", namespaces: Any = None, case_sensitive: bool = True, exclude_dirs: Any = None, ) -> Globber: fp = _FSPath(path) _path = fs.path.join(fp._root, fp._path) if fp.is_windows else path return super().__call__( pattern, path=_path, namespaces=namespaces, case_sensitive=case_sensitive, exclude_dirs=exclude_dirs, ) class _FSPath: def __init__(self, path: str): if path is None: raise ValueError("path can't be None") path = _modify_path(path) self._is_windows = False if _is_windows(path): self._scheme = "" self._root = path[:3] self._path = path[3:] self._is_windows = True elif path.startswith("/"): self._scheme = "" self._root = "/" self._path = fs.path.abspath(path) else: uri = urlparse(path) if uri.scheme == "" and not path.startswith("/"): raise ValueError( f"invalid {path}, must be abs path either local or with scheme" ) self._scheme = uri.scheme if uri.netloc == "": raise ValueError(f"invalid path {path}") self._root = uri.scheme + "://" + uri.netloc self._path = uri.path self._path = self._path.lstrip("/") # if self._path == "": # raise ValueError(f"invalid path {path}") @property def is_windows(self) -> bool: return self._is_windows @property def scheme(self) -> str: return self._scheme @property def root(self) -> str: return self._root @property def relative_path(self) -> str: return self._path def _modify_path(path: str) -> str: # noqa: C901 """to fix things like /s3:/a/b.txt -> s3://a/b.txt""" if path.startswith("/"): s =[1:]) if s is not None: colon = s.end() scheme = path[1:colon] if colon + 1 == len(path): # /C: or /s3: path = scheme + "://" elif path[colon + 1] == "/": # /s3:/a/b.txt path = scheme + "://" + path[colon + 1 :].lstrip("/") elif path[colon + 1] == "\\": # /c:\a\b.txt path = scheme + ":\\" + path[colon + 1 :].lstrip("\\") if path.startswith("file:///"): path = path[8:] elif path.startswith("file://"): path = path[6:] if path.startswith("\\\\"): # windows \\\... raise NotImplementedError(f"path {path} is not supported") if path != "" and path[0].isalpha(): if len(path) == 2 and path[1] == ":": # C: => C:/ return path[0] + ":/" if path[1:].startswith(":\\"): # C:\a\b\c => C:/a/b/c return path[0] + ":/" + path[3:].replace("\\", "/").lstrip("/") if path[1:].startswith(":/"): # C:/a/b/c => C:/a/b/c return path[0] + ":/" + path[3:].lstrip("/") return path def _is_windows(path: str) -> bool: if len(path) < 3: return False return path[0].isalpha() and path[1] == ":" and path[2] == "/"