Skip to content

Io

zenml.io special

The io module handles file operations for the ZenML package. It offers a standard interface for reading, writing and manipulating files and directories. It is heavily influenced and inspired by the io module of tfx.

fileio

append_file(file_path, file_contents)

Appends file_contents to file.

Parameters:

Name Type Description Default
file_path str

Local path in filesystem.

required
file_contents str

Contents of file.

required
Source code in zenml/io/fileio.py
def append_file(file_path: str, file_contents: str) -> None:
    """Appends file_contents to file.

    Args:
        file_path: Local path in filesystem.
        file_contents: Contents of file.
    """
    # with file_io.FileIO(file_path, mode='a') as f:
    #     f.write(file_contents)
    raise NotImplementedError

convert_to_str(path)

Converts a PathType to a str using UTF-8.

Source code in zenml/io/fileio.py
def convert_to_str(path: PathType) -> str:
    """Converts a PathType to a str using UTF-8."""
    if isinstance(path, str):
        return path
    else:
        return path.decode("utf-8")

copy(src, dst, overwrite=False)

Copy a file from the source to the destination.

Source code in zenml/io/fileio.py
def copy(src: PathType, dst: PathType, overwrite: bool = False) -> None:
    """Copy a file from the source to the destination."""
    src_fs = _get_filesystem(src)
    dst_fs = _get_filesystem(dst)
    if src_fs is dst_fs:
        src_fs.copy(src, dst, overwrite=overwrite)
    else:
        if not overwrite and file_exists(dst):
            raise FileExistsError(
                f"Destination file '{convert_to_str(dst)}' already exists "
                f"and `overwrite` is false."
            )
        contents = open(src, mode="rb").read()
        open(dst, mode="wb").write(contents)

copy_dir(source_dir, destination_dir, overwrite=False)

Copies dir from source to destination.

Parameters:

Name Type Description Default
source_dir str

Path to copy from.

required
destination_dir str

Path to copy to.

required
overwrite bool

Boolean. If false, function throws an error before overwrite.

False
Source code in zenml/io/fileio.py
def copy_dir(
    source_dir: str, destination_dir: str, overwrite: bool = False
) -> None:
    """Copies dir from source to destination.

    Args:
        source_dir: Path to copy from.
        destination_dir: Path to copy to.
        overwrite: Boolean. If false, function throws an error before overwrite.
    """
    for source_file in list_dir(source_dir):
        source_file_path = Path(source_file)
        destination_name = os.path.join(destination_dir, source_file_path.name)
        if is_dir(source_file):
            copy_dir(source_file, destination_name, overwrite)
        else:
            create_dir_recursive_if_not_exists(
                str(Path(destination_name).parent)
            )
            copy(str(source_file_path), str(destination_name), overwrite)

create_dir_if_not_exists(dir_path)

Creates directory if it does not exist.

Parameters:

Name Type Description Default
dir_path str

Local path in filesystem.

required
Source code in zenml/io/fileio.py
def create_dir_if_not_exists(dir_path: str) -> None:
    """Creates directory if it does not exist.

    Args:
        dir_path: Local path in filesystem.
    """
    if not is_dir(dir_path):
        mkdir(dir_path)

create_dir_recursive_if_not_exists(dir_path)

Creates directory recursively if it does not exist.

Parameters:

Name Type Description Default
dir_path str

Local path in filesystem.

required
Source code in zenml/io/fileio.py
def create_dir_recursive_if_not_exists(dir_path: str) -> None:
    """Creates directory recursively if it does not exist.

    Args:
        dir_path: Local path in filesystem.
    """
    if not is_dir(dir_path):
        make_dirs(dir_path)

create_file_if_not_exists(file_path, file_contents='{}')

Creates file if it does not exist.

Parameters:

Name Type Description Default
file_path str

Local path in filesystem.

required
file_contents str

Contents of file.

'{}'
Source code in zenml/io/fileio.py
def create_file_if_not_exists(
    file_path: str, file_contents: str = "{}"
) -> None:
    """Creates file if it does not exist.

    Args:
        file_path: Local path in filesystem.
        file_contents: Contents of file.

    """
    # if not fileio.exists(file_path):
    #     fileio.(file_path, file_contents)
    full_path = Path(file_path)
    create_dir_recursive_if_not_exists(str(full_path.parent))
    with open(str(full_path), "w") as f:
        f.write(file_contents)

file_exists(path)

Returns True if the given path exists.

Source code in zenml/io/fileio.py
def file_exists(path: PathType) -> bool:
    """Returns `True` if the given path exists."""
    return _get_filesystem(path).exists(path)

find_files(dir_path, pattern)

Find files in a directory that match pattern.

Parameters:

Name Type Description Default
dir_path Union[bytes, str]

Path to directory.

required
pattern str

pattern like *.png.

required

Yields:

Type Description
Iterable[str]

All matching filenames if found, else None.

Source code in zenml/io/fileio.py
def find_files(dir_path: PathType, pattern: str) -> Iterable[str]:
    # TODO [ENG-189]: correct docstring since 'None' is never returned
    """Find files in a directory that match pattern.

    Args:
        dir_path: Path to directory.
        pattern: pattern like *.png.

    Yields:
         All matching filenames if found, else None.
    """
    for root, dirs, files in walk(dir_path):
        for basename in files:
            if fnmatch.fnmatch(convert_to_str(basename), pattern):
                filename = os.path.join(
                    convert_to_str(root), convert_to_str(basename)
                )
                yield filename

get_grandparent(dir_path)

Get grandparent of dir.

Parameters:

Name Type Description Default
dir_path str

Path to directory.

required

Returns:

Type Description
str

The input paths parents parent.

Source code in zenml/io/fileio.py
def get_grandparent(dir_path: str) -> str:
    """Get grandparent of dir.

    Args:
        dir_path: Path to directory.

    Returns:
        The input paths parents parent.
    """
    return Path(dir_path).parent.parent.stem

get_parent(dir_path)

Get parent of dir.

Parameters:

Name Type Description Default
dir_path str

Path to directory.

required

Returns:

Type Description
str

Parent (stem) of the dir as a string.

Source code in zenml/io/fileio.py
def get_parent(dir_path: str) -> str:
    """Get parent of dir.

    Args:
        dir_path: Path to directory.

    Returns:
        Parent (stem) of the dir as a string.
    """
    return Path(dir_path).parent.stem

glob(pattern)

Return the paths that match a glob pattern.

Source code in zenml/io/fileio.py
def glob(pattern: PathType) -> List[PathType]:
    """Return the paths that match a glob pattern."""
    return _get_filesystem(pattern).glob(pattern)

is_dir(path)

Returns whether the given path points to a directory.

Source code in zenml/io/fileio.py
def is_dir(path: PathType) -> bool:
    """Returns whether the given path points to a directory."""
    return _get_filesystem(path).isdir(path)

is_remote(path)

Returns True if path exists remotely.

Parameters:

Name Type Description Default
path str

Any path as a string.

required

Returns:

Type Description
bool

True if remote path, else False.

Source code in zenml/io/fileio.py
def is_remote(path: str) -> bool:
    """Returns True if path exists remotely.

    Args:
        path: Any path as a string.

    Returns:
        True if remote path, else False.
    """
    return any(path.startswith(prefix) for prefix in REMOTE_FS_PREFIX)

is_root(path)

Returns true if path has no parent in local filesystem.

Parameters:

Name Type Description Default
path str

Local path in filesystem.

required

Returns:

Type Description
bool

True if root, else False.

Source code in zenml/io/fileio.py
def is_root(path: str) -> bool:
    """Returns true if path has no parent in local filesystem.

    Args:
        path: Local path in filesystem.

    Returns:
        True if root, else False.
    """
    return Path(path).parent == Path(path)

list_dir(dir_path, only_file_names=False)

Returns a list of files under dir.

Parameters:

Name Type Description Default
dir_path str

Path in filesystem.

required
only_file_names bool

Returns only file names if True.

False

Returns:

Type Description
List[str]

List of full qualified paths.

Source code in zenml/io/fileio.py
def list_dir(dir_path: str, only_file_names: bool = False) -> List[str]:
    """Returns a list of files under dir.

    Args:
        dir_path: Path in filesystem.
        only_file_names: Returns only file names if True.

    Returns:
        List of full qualified paths.
    """
    try:
        return [
            os.path.join(dir_path, convert_to_str(f))
            if not only_file_names
            else convert_to_str(f)
            for f in _get_filesystem(dir_path).listdir(dir_path)
        ]
    except IOError:
        logger.debug(f"Dir {dir_path} not found.")
        return []

make_dirs(path)

Make a directory at the given path, recursively creating parents.

Source code in zenml/io/fileio.py
def make_dirs(path: PathType) -> None:
    """Make a directory at the given path, recursively creating parents."""
    _get_filesystem(path).makedirs(path)

mkdir(path)

Make a directory at the given path; parent directory must exist.

Source code in zenml/io/fileio.py
def mkdir(path: PathType) -> None:
    """Make a directory at the given path; parent directory must exist."""
    _get_filesystem(path).mkdir(path)

move(source, destination, overwrite=False)

Moves dir or file from source to destination. Can be used to rename.

Parameters:

Name Type Description Default
source str

Local path to copy from.

required
destination str

Local path to copy to.

required
overwrite bool

boolean, if false, then throws an error before overwrite.

False
Source code in zenml/io/fileio.py
def move(source: str, destination: str, overwrite: bool = False) -> None:
    """Moves dir or file from source to destination. Can be used to rename.

    Args:
        source: Local path to copy from.
        destination: Local path to copy to.
        overwrite: boolean, if false, then throws an error before overwrite.
    """
    rename(source, destination, overwrite)

open(path, mode='r')

Open a file at the given path.

Source code in zenml/io/fileio.py
def open(path: PathType, mode: str = "r") -> Any:  # noqa
    """Open a file at the given path."""
    return _get_filesystem(path).open(path, mode=mode)

remove(path)

Remove the file at the given path. Dangerous operation.

Source code in zenml/io/fileio.py
def remove(path: PathType) -> None:
    """Remove the file at the given path. Dangerous operation."""
    if not file_exists(path):
        raise FileNotFoundError(f"{convert_to_str(path)} does not exist!")
    _get_filesystem(path).remove(path)

rename(src, dst, overwrite=False)

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/io/fileio.py
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    src_fs = _get_filesystem(src)
    dst_fs = _get_filesystem(dst)
    if src_fs is dst_fs:
        src_fs.rename(src, dst, overwrite=overwrite)
    else:
        raise NotImplementedError(
            f"Renaming from {convert_to_str(src)} to {convert_to_str(dst)} "
            f"using different filesystems plugins is currently not supported."
        )

resolve_relative_path(path)

Takes relative path and resolves it absolutely.

Parameters:

Name Type Description Default
path str

Local path in filesystem.

required

Returns:

Type Description
str

Resolved path.

Source code in zenml/io/fileio.py
def resolve_relative_path(path: str) -> str:
    """Takes relative path and resolves it absolutely.

    Args:
      path: Local path in filesystem.

    Returns:
        Resolved path.
    """
    if is_remote(path):
        return path
    return str(Path(path).resolve())

rm_dir(dir_path)

Deletes dir recursively. Dangerous operation.

Parameters:

Name Type Description Default
dir_path str

Dir to delete.

required

Exceptions:

Type Description
TypeError

If the path is not pointing to a directory.

Source code in zenml/io/fileio.py
def rm_dir(dir_path: str) -> None:
    """Deletes dir recursively. Dangerous operation.

    Args:
        dir_path: Dir to delete.

    Raises:
        TypeError: If the path is not pointing to a directory.
    """
    if not is_dir(dir_path):
        raise TypeError(f"Path '{dir_path}' is not a directory.")

    _get_filesystem(dir_path).rmtree(dir_path)

stat(path)

Return the stat descriptor for a given file path.

Source code in zenml/io/fileio.py
def stat(path: PathType) -> Any:
    """Return the stat descriptor for a given file path."""
    return _get_filesystem(path).stat(path)

walk(top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Whether to walk directories topdown or bottom-up.

True
onerror Optional[Callable[..., NoneType]]

Callable that gets called if an error occurs.

None

Returns:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/io/fileio.py
def walk(
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Whether to walk directories topdown or bottom-up.
        onerror: Callable that gets called if an error occurs.

    Returns:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    return _get_filesystem(top).walk(top, topdown=topdown, onerror=onerror)

fileio_registry

Filesystem registry managing filesystem plugins.

FileIORegistry

Registry of pluggable filesystem implementations used in TFX components.

Source code in zenml/io/fileio_registry.py
class FileIORegistry:
    """Registry of pluggable filesystem implementations used in TFX components."""

    def __init__(self) -> None:
        self._filesystems: Dict[PathType, Type[Filesystem]] = {}
        self._registration_lock = threading.Lock()

    def register(self, filesystem_cls: Type[Filesystem]) -> None:
        """Register a filesystem implementation.

        Args:
          filesystem_cls: Subclass of `tfx.dsl.io.filesystem.Filesystem`.
        """
        with self._registration_lock:
            for scheme in filesystem_cls.SUPPORTED_SCHEMES:
                current_preferred = self._filesystems.get(scheme)
                if current_preferred is not None:
                    # TODO: [LOW] Decide what to do here. Do we overwrite,
                    #   give out a warning or do we fail?
                    pass
                self._filesystems[scheme] = filesystem_cls

    def get_filesystem_for_scheme(self, scheme: PathType) -> Type[Filesystem]:
        """Get filesystem plugin for given scheme string."""
        if isinstance(scheme, bytes):
            scheme = scheme.decode("utf-8")
        if scheme not in self._filesystems:
            raise Exception(
                f"No filesystems were found for the scheme: "
                f"{scheme}. Please make sure that you are using "
                f"the right path and the all the necessary "
                f"integrations are properly installed."
            )
        return self._filesystems[scheme]

    def get_filesystem_for_path(self, path: PathType) -> Type[Filesystem]:
        """Get filesystem plugin for given path."""
        # Assume local path by default, but extract filesystem prefix if available.
        if isinstance(path, str):
            path_bytes = path.encode("utf-8")
        elif isinstance(path, bytes):
            path_bytes = path
        else:
            raise ValueError("Invalid path type: %r." % path)
        result = re.match(b"^([a-z0-9]+://)", path_bytes)
        if result:
            scheme = result.group(1).decode("utf-8")
        else:
            scheme = ""
        return self.get_filesystem_for_scheme(scheme)
get_filesystem_for_path(self, path)

Get filesystem plugin for given path.

Source code in zenml/io/fileio_registry.py
def get_filesystem_for_path(self, path: PathType) -> Type[Filesystem]:
    """Get filesystem plugin for given path."""
    # Assume local path by default, but extract filesystem prefix if available.
    if isinstance(path, str):
        path_bytes = path.encode("utf-8")
    elif isinstance(path, bytes):
        path_bytes = path
    else:
        raise ValueError("Invalid path type: %r." % path)
    result = re.match(b"^([a-z0-9]+://)", path_bytes)
    if result:
        scheme = result.group(1).decode("utf-8")
    else:
        scheme = ""
    return self.get_filesystem_for_scheme(scheme)
get_filesystem_for_scheme(self, scheme)

Get filesystem plugin for given scheme string.

Source code in zenml/io/fileio_registry.py
def get_filesystem_for_scheme(self, scheme: PathType) -> Type[Filesystem]:
    """Get filesystem plugin for given scheme string."""
    if isinstance(scheme, bytes):
        scheme = scheme.decode("utf-8")
    if scheme not in self._filesystems:
        raise Exception(
            f"No filesystems were found for the scheme: "
            f"{scheme}. Please make sure that you are using "
            f"the right path and the all the necessary "
            f"integrations are properly installed."
        )
    return self._filesystems[scheme]
register(self, filesystem_cls)

Register a filesystem implementation.

Parameters:

Name Type Description Default
filesystem_cls Type[tfx.dsl.io.filesystem.Filesystem]

Subclass of tfx.dsl.io.filesystem.Filesystem.

required
Source code in zenml/io/fileio_registry.py
def register(self, filesystem_cls: Type[Filesystem]) -> None:
    """Register a filesystem implementation.

    Args:
      filesystem_cls: Subclass of `tfx.dsl.io.filesystem.Filesystem`.
    """
    with self._registration_lock:
        for scheme in filesystem_cls.SUPPORTED_SCHEMES:
            current_preferred = self._filesystems.get(scheme)
            if current_preferred is not None:
                # TODO: [LOW] Decide what to do here. Do we overwrite,
                #   give out a warning or do we fail?
                pass
            self._filesystems[scheme] = filesystem_cls

filesystem

FileSystemMeta (type)

Metaclass which is responsible for registering the defined filesystem in the default fileio registry.

Source code in zenml/io/filesystem.py
class FileSystemMeta(type):
    """Metaclass which is responsible for registering the defined filesystem
    in the default fileio registry."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "FileSystemMeta":
        """Creates the filesystem class and registers it"""
        cls = cast(Type["Filesystem"], super().__new__(mcs, name, bases, dct))
        if name != "Filesystem":
            assert cls.SUPPORTED_SCHEMES, (
                "You should specify a list of SUPPORTED_SCHEMES when creating "
                "a filesystem"
            )
            default_fileio_registry.register(cls)

        return cls
__new__(mcs, name, bases, dct) special staticmethod

Creates the filesystem class and registers it

Source code in zenml/io/filesystem.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "FileSystemMeta":
    """Creates the filesystem class and registers it"""
    cls = cast(Type["Filesystem"], super().__new__(mcs, name, bases, dct))
    if name != "Filesystem":
        assert cls.SUPPORTED_SCHEMES, (
            "You should specify a list of SUPPORTED_SCHEMES when creating "
            "a filesystem"
        )
        default_fileio_registry.register(cls)

    return cls

Filesystem (Filesystem)

Abstract Filesystem class.

Source code in zenml/io/filesystem.py
class Filesystem(BaseFileSystem, metaclass=FileSystemMeta):
    """Abstract Filesystem class."""

NotFoundError (OSError)

Auxiliary not found error

Source code in zenml/io/filesystem.py
class NotFoundError(IOError):
    """Auxiliary not found error"""

utils

create_tarfile(source_dir, output_filename='zipped.tar.gz', exclude_function=None)

Create a compressed representation of source_dir.

Parameters:

Name Type Description Default
source_dir str

Path to source dir.

required
output_filename str

Name of outputted gz.

'zipped.tar.gz'
exclude_function Optional[Callable[[tarfile.TarInfo], Union[tarfile.TarInfo, NoneType]]]

Function that determines whether to exclude file.

None
Source code in zenml/io/utils.py
def create_tarfile(
    source_dir: str,
    output_filename: str = "zipped.tar.gz",
    exclude_function: Optional[
        Callable[[tarfile.TarInfo], Optional[tarfile.TarInfo]]
    ] = None,
) -> None:
    """Create a compressed representation of source_dir.

    Args:
        source_dir: Path to source dir.
        output_filename: Name of outputted gz.
        exclude_function: Function that determines whether to exclude file.
    """
    if exclude_function is None:
        # default is to exclude the .zenml directory
        def exclude_function(
            tarinfo: tarfile.TarInfo,
        ) -> Optional[tarfile.TarInfo]:
            """Exclude files from tar.

            Args:
              tarinfo: Any

            Returns:
                tarinfo required for exclude.
            """
            filename = tarinfo.name
            if ".zenml/" in filename or "venv/" in filename:
                return None
            else:
                return tarinfo

    with tarfile.open(output_filename, "w:gz") as tar:
        tar.add(source_dir, arcname="", filter=exclude_function)

extract_tarfile(source_tar, output_dir)

Extracts all files in a compressed tar file to output_dir.

Parameters:

Name Type Description Default
source_tar str

Path to a tar compressed file.

required
output_dir str

Directory where to extract.

required
Source code in zenml/io/utils.py
def extract_tarfile(source_tar: str, output_dir: str) -> None:
    """Extracts all files in a compressed tar file to output_dir.

    Args:
        source_tar: Path to a tar compressed file.
        output_dir: Directory where to extract.
    """
    if is_remote(source_tar):
        raise NotImplementedError("Use local tars for now.")

    with tarfile.open(source_tar, "r:gz") as tar:
        tar.extractall(output_dir)

get_global_config_directory()

Returns the global config directory for ZenML.

Source code in zenml/io/utils.py
def get_global_config_directory() -> str:
    """Returns the global config directory for ZenML."""
    return click.get_app_dir(APP_NAME)

is_gcs_path(path)

Returns True if path is on Google Cloud Storage.

Parameters:

Name Type Description Default
path str

Any path as a string.

required

Returns:

Type Description
bool

True if gcs path, else False.

Source code in zenml/io/utils.py
def is_gcs_path(path: str) -> bool:
    """Returns True if path is on Google Cloud Storage.

    Args:
        path: Any path as a string.

    Returns:
        True if gcs path, else False.
    """
    return path.startswith("gs://")

read_file_contents_as_string(file_path)

Reads contents of file.

Parameters:

Name Type Description Default
file_path str

Path to file.

required
Source code in zenml/io/utils.py
def read_file_contents_as_string(file_path: str) -> str:
    """Reads contents of file.

    Args:
        file_path: Path to file.
    """
    if not file_exists(file_path):
        raise FileNotFoundError(f"{file_path} does not exist!")
    return open(file_path).read()  # type: ignore[no-any-return]

write_file_contents_as_string(file_path, content)

Writes contents of file.

Parameters:

Name Type Description Default
file_path str

Path to file.

required
content str

Contents of file.

required
Source code in zenml/io/utils.py
def write_file_contents_as_string(file_path: str, content: str) -> None:
    """Writes contents of file.

    Args:
        file_path: Path to file.
        content: Contents of file.
    """
    with open(file_path, "w") as f:
        f.write(content)