Io
zenml.io
special
The io
module handles file operations for the ZenML package. It offers a
standard interface for reading, writing and manipulating files and directories.
It is heavily influenced and inspired by the io
module of tfx
.
fileio
append_file(file_path, file_contents)
Appends file_contents to file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Local path in filesystem. |
required |
file_contents |
str |
Contents of file. |
required |
Source code in zenml/io/fileio.py
def append_file(file_path: str, file_contents: str) -> None:
"""Appends file_contents to file.
Args:
file_path: Local path in filesystem.
file_contents: Contents of file.
"""
# with file_io.FileIO(file_path, mode='a') as f:
# f.write(file_contents)
raise NotImplementedError
convert_to_str(path)
Converts a PathType to a str using UTF-8.
Source code in zenml/io/fileio.py
def convert_to_str(path: PathType) -> str:
"""Converts a PathType to a str using UTF-8."""
if isinstance(path, str):
return path
else:
return path.decode("utf-8")
copy(src, dst, overwrite=False)
Copy a file from the source to the destination.
Source code in zenml/io/fileio.py
def copy(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
src_fs = _get_filesystem(src)
dst_fs = _get_filesystem(dst)
if src_fs is dst_fs:
src_fs.copy(src, dst, overwrite=overwrite)
else:
if not overwrite and file_exists(dst):
raise FileExistsError(
f"Destination file '{convert_to_str(dst)}' already exists "
f"and `overwrite` is false."
)
contents = open(src, mode="rb").read()
open(dst, mode="wb").write(contents)
copy_dir(source_dir, destination_dir, overwrite=False)
Copies dir from source to destination.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_dir |
str |
Path to copy from. |
required |
destination_dir |
str |
Path to copy to. |
required |
overwrite |
bool |
Boolean. If false, function throws an error before overwrite. |
False |
Source code in zenml/io/fileio.py
def copy_dir(
source_dir: str, destination_dir: str, overwrite: bool = False
) -> None:
"""Copies dir from source to destination.
Args:
source_dir: Path to copy from.
destination_dir: Path to copy to.
overwrite: Boolean. If false, function throws an error before overwrite.
"""
for source_file in list_dir(source_dir):
source_file_path = Path(source_file)
destination_name = os.path.join(destination_dir, source_file_path.name)
if is_dir(source_file):
copy_dir(source_file, destination_name, overwrite)
else:
create_dir_recursive_if_not_exists(
str(Path(destination_name).parent)
)
copy(str(source_file_path), str(destination_name), overwrite)
create_dir_if_not_exists(dir_path)
Creates directory if it does not exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
str |
Local path in filesystem. |
required |
Source code in zenml/io/fileio.py
def create_dir_if_not_exists(dir_path: str) -> None:
"""Creates directory if it does not exist.
Args:
dir_path: Local path in filesystem.
"""
if not is_dir(dir_path):
mkdir(dir_path)
create_dir_recursive_if_not_exists(dir_path)
Creates directory recursively if it does not exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
str |
Local path in filesystem. |
required |
Source code in zenml/io/fileio.py
def create_dir_recursive_if_not_exists(dir_path: str) -> None:
"""Creates directory recursively if it does not exist.
Args:
dir_path: Local path in filesystem.
"""
if not is_dir(dir_path):
make_dirs(dir_path)
create_file_if_not_exists(file_path, file_contents='{}')
Creates file if it does not exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Local path in filesystem. |
required |
file_contents |
str |
Contents of file. |
'{}' |
Source code in zenml/io/fileio.py
def create_file_if_not_exists(
file_path: str, file_contents: str = "{}"
) -> None:
"""Creates file if it does not exist.
Args:
file_path: Local path in filesystem.
file_contents: Contents of file.
"""
# if not fileio.exists(file_path):
# fileio.(file_path, file_contents)
full_path = Path(file_path)
create_dir_recursive_if_not_exists(str(full_path.parent))
with open(str(full_path), "w") as f:
f.write(file_contents)
file_exists(path)
Returns True
if the given path exists.
Source code in zenml/io/fileio.py
def file_exists(path: PathType) -> bool:
"""Returns `True` if the given path exists."""
return _get_filesystem(path).exists(path)
find_files(dir_path, pattern)
Find files in a directory that match pattern.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
Union[bytes, str] |
Path to directory. |
required |
pattern |
str |
pattern like *.png. |
required |
Yields:
Type | Description |
---|---|
Iterable[str] |
All matching filenames if found, else None. |
Source code in zenml/io/fileio.py
def find_files(dir_path: PathType, pattern: str) -> Iterable[str]:
# TODO [ENG-189]: correct docstring since 'None' is never returned
"""Find files in a directory that match pattern.
Args:
dir_path: Path to directory.
pattern: pattern like *.png.
Yields:
All matching filenames if found, else None.
"""
for root, dirs, files in walk(dir_path):
for basename in files:
if fnmatch.fnmatch(convert_to_str(basename), pattern):
filename = os.path.join(
convert_to_str(root), convert_to_str(basename)
)
yield filename
get_grandparent(dir_path)
Get grandparent of dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
str |
Path to directory. |
required |
Returns:
Type | Description |
---|---|
str |
The input paths parents parent. |
Source code in zenml/io/fileio.py
def get_grandparent(dir_path: str) -> str:
"""Get grandparent of dir.
Args:
dir_path: Path to directory.
Returns:
The input paths parents parent.
"""
return Path(dir_path).parent.parent.stem
get_parent(dir_path)
Get parent of dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
str |
Path to directory. |
required |
Returns:
Type | Description |
---|---|
str |
Parent (stem) of the dir as a string. |
Source code in zenml/io/fileio.py
def get_parent(dir_path: str) -> str:
"""Get parent of dir.
Args:
dir_path: Path to directory.
Returns:
Parent (stem) of the dir as a string.
"""
return Path(dir_path).parent.stem
glob(pattern)
Return the paths that match a glob pattern.
Source code in zenml/io/fileio.py
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
return _get_filesystem(pattern).glob(pattern)
is_dir(path)
Returns whether the given path points to a directory.
Source code in zenml/io/fileio.py
def is_dir(path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
return _get_filesystem(path).isdir(path)
is_remote(path)
Returns True if path exists remotely.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
Any path as a string. |
required |
Returns:
Type | Description |
---|---|
bool |
True if remote path, else False. |
Source code in zenml/io/fileio.py
def is_remote(path: str) -> bool:
"""Returns True if path exists remotely.
Args:
path: Any path as a string.
Returns:
True if remote path, else False.
"""
return any(path.startswith(prefix) for prefix in REMOTE_FS_PREFIX)
is_root(path)
Returns true if path has no parent in local filesystem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
Local path in filesystem. |
required |
Returns:
Type | Description |
---|---|
bool |
True if root, else False. |
Source code in zenml/io/fileio.py
def is_root(path: str) -> bool:
"""Returns true if path has no parent in local filesystem.
Args:
path: Local path in filesystem.
Returns:
True if root, else False.
"""
return Path(path).parent == Path(path)
list_dir(dir_path, only_file_names=False)
Returns a list of files under dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
str |
Path in filesystem. |
required |
only_file_names |
bool |
Returns only file names if True. |
False |
Returns:
Type | Description |
---|---|
List[str] |
List of full qualified paths. |
Source code in zenml/io/fileio.py
def list_dir(dir_path: str, only_file_names: bool = False) -> List[str]:
"""Returns a list of files under dir.
Args:
dir_path: Path in filesystem.
only_file_names: Returns only file names if True.
Returns:
List of full qualified paths.
"""
try:
return [
os.path.join(dir_path, convert_to_str(f))
if not only_file_names
else convert_to_str(f)
for f in _get_filesystem(dir_path).listdir(dir_path)
]
except IOError:
logger.debug(f"Dir {dir_path} not found.")
return []
make_dirs(path)
Make a directory at the given path, recursively creating parents.
Source code in zenml/io/fileio.py
def make_dirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
_get_filesystem(path).makedirs(path)
mkdir(path)
Make a directory at the given path; parent directory must exist.
Source code in zenml/io/fileio.py
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
_get_filesystem(path).mkdir(path)
move(source, destination, overwrite=False)
Moves dir or file from source to destination. Can be used to rename.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
Local path to copy from. |
required |
destination |
str |
Local path to copy to. |
required |
overwrite |
bool |
boolean, if false, then throws an error before overwrite. |
False |
Source code in zenml/io/fileio.py
def move(source: str, destination: str, overwrite: bool = False) -> None:
"""Moves dir or file from source to destination. Can be used to rename.
Args:
source: Local path to copy from.
destination: Local path to copy to.
overwrite: boolean, if false, then throws an error before overwrite.
"""
rename(source, destination, overwrite)
open(path, mode='r')
Open a file at the given path.
Source code in zenml/io/fileio.py
def open(path: PathType, mode: str = "r") -> Any: # noqa
"""Open a file at the given path."""
return _get_filesystem(path).open(path, mode=mode)
remove(path)
Remove the file at the given path. Dangerous operation.
Source code in zenml/io/fileio.py
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
if not file_exists(path):
raise FileNotFoundError(f"{convert_to_str(path)} does not exist!")
_get_filesystem(path).remove(path)
rename(src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/io/fileio.py
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
src_fs = _get_filesystem(src)
dst_fs = _get_filesystem(dst)
if src_fs is dst_fs:
src_fs.rename(src, dst, overwrite=overwrite)
else:
raise NotImplementedError(
f"Renaming from {convert_to_str(src)} to {convert_to_str(dst)} "
f"using different filesystems plugins is currently not supported."
)
resolve_relative_path(path)
Takes relative path and resolves it absolutely.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
Local path in filesystem. |
required |
Returns:
Type | Description |
---|---|
str |
Resolved path. |
Source code in zenml/io/fileio.py
def resolve_relative_path(path: str) -> str:
"""Takes relative path and resolves it absolutely.
Args:
path: Local path in filesystem.
Returns:
Resolved path.
"""
if is_remote(path):
return path
return str(Path(path).resolve())
rm_dir(dir_path)
Deletes dir recursively. Dangerous operation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dir_path |
str |
Dir to delete. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the path is not pointing to a directory. |
Source code in zenml/io/fileio.py
def rm_dir(dir_path: str) -> None:
"""Deletes dir recursively. Dangerous operation.
Args:
dir_path: Dir to delete.
Raises:
TypeError: If the path is not pointing to a directory.
"""
if not is_dir(dir_path):
raise TypeError(f"Path '{dir_path}' is not a directory.")
_get_filesystem(dir_path).rmtree(dir_path)
stat(path)
Return the stat descriptor for a given file path.
Source code in zenml/io/fileio.py
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
return _get_filesystem(path).stat(path)
walk(top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Returns:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/io/fileio.py
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
return _get_filesystem(top).walk(top, topdown=topdown, onerror=onerror)
fileio_registry
Filesystem registry managing filesystem plugins.
FileIORegistry
Registry of pluggable filesystem implementations used in TFX components.
Source code in zenml/io/fileio_registry.py
class FileIORegistry:
"""Registry of pluggable filesystem implementations used in TFX components."""
def __init__(self) -> None:
self._filesystems: Dict[PathType, Type[Filesystem]] = {}
self._registration_lock = threading.Lock()
def register(self, filesystem_cls: Type[Filesystem]) -> None:
"""Register a filesystem implementation.
Args:
filesystem_cls: Subclass of `tfx.dsl.io.filesystem.Filesystem`.
"""
with self._registration_lock:
for scheme in filesystem_cls.SUPPORTED_SCHEMES:
current_preferred = self._filesystems.get(scheme)
if current_preferred is not None:
# TODO: [LOW] Decide what to do here. Do we overwrite,
# give out a warning or do we fail?
pass
self._filesystems[scheme] = filesystem_cls
def get_filesystem_for_scheme(self, scheme: PathType) -> Type[Filesystem]:
"""Get filesystem plugin for given scheme string."""
if isinstance(scheme, bytes):
scheme = scheme.decode("utf-8")
if scheme not in self._filesystems:
raise Exception(
f"No filesystems were found for the scheme: "
f"{scheme}. Please make sure that you are using "
f"the right path and the all the necessary "
f"integrations are properly installed."
)
return self._filesystems[scheme]
def get_filesystem_for_path(self, path: PathType) -> Type[Filesystem]:
"""Get filesystem plugin for given path."""
# Assume local path by default, but extract filesystem prefix if available.
if isinstance(path, str):
path_bytes = path.encode("utf-8")
elif isinstance(path, bytes):
path_bytes = path
else:
raise ValueError("Invalid path type: %r." % path)
result = re.match(b"^([a-z0-9]+://)", path_bytes)
if result:
scheme = result.group(1).decode("utf-8")
else:
scheme = ""
return self.get_filesystem_for_scheme(scheme)
get_filesystem_for_path(self, path)
Get filesystem plugin for given path.
Source code in zenml/io/fileio_registry.py
def get_filesystem_for_path(self, path: PathType) -> Type[Filesystem]:
"""Get filesystem plugin for given path."""
# Assume local path by default, but extract filesystem prefix if available.
if isinstance(path, str):
path_bytes = path.encode("utf-8")
elif isinstance(path, bytes):
path_bytes = path
else:
raise ValueError("Invalid path type: %r." % path)
result = re.match(b"^([a-z0-9]+://)", path_bytes)
if result:
scheme = result.group(1).decode("utf-8")
else:
scheme = ""
return self.get_filesystem_for_scheme(scheme)
get_filesystem_for_scheme(self, scheme)
Get filesystem plugin for given scheme string.
Source code in zenml/io/fileio_registry.py
def get_filesystem_for_scheme(self, scheme: PathType) -> Type[Filesystem]:
"""Get filesystem plugin for given scheme string."""
if isinstance(scheme, bytes):
scheme = scheme.decode("utf-8")
if scheme not in self._filesystems:
raise Exception(
f"No filesystems were found for the scheme: "
f"{scheme}. Please make sure that you are using "
f"the right path and the all the necessary "
f"integrations are properly installed."
)
return self._filesystems[scheme]
register(self, filesystem_cls)
Register a filesystem implementation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
filesystem_cls |
Type[tfx.dsl.io.filesystem.Filesystem] |
Subclass of |
required |
Source code in zenml/io/fileio_registry.py
def register(self, filesystem_cls: Type[Filesystem]) -> None:
"""Register a filesystem implementation.
Args:
filesystem_cls: Subclass of `tfx.dsl.io.filesystem.Filesystem`.
"""
with self._registration_lock:
for scheme in filesystem_cls.SUPPORTED_SCHEMES:
current_preferred = self._filesystems.get(scheme)
if current_preferred is not None:
# TODO: [LOW] Decide what to do here. Do we overwrite,
# give out a warning or do we fail?
pass
self._filesystems[scheme] = filesystem_cls
filesystem
FileSystemMeta (type)
Metaclass which is responsible for registering the defined filesystem in the default fileio registry.
Source code in zenml/io/filesystem.py
class FileSystemMeta(type):
"""Metaclass which is responsible for registering the defined filesystem
in the default fileio registry."""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "FileSystemMeta":
"""Creates the filesystem class and registers it"""
cls = cast(Type["Filesystem"], super().__new__(mcs, name, bases, dct))
if name != "Filesystem":
assert cls.SUPPORTED_SCHEMES, (
"You should specify a list of SUPPORTED_SCHEMES when creating "
"a filesystem"
)
default_fileio_registry.register(cls)
return cls
__new__(mcs, name, bases, dct)
special
staticmethod
Creates the filesystem class and registers it
Source code in zenml/io/filesystem.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "FileSystemMeta":
"""Creates the filesystem class and registers it"""
cls = cast(Type["Filesystem"], super().__new__(mcs, name, bases, dct))
if name != "Filesystem":
assert cls.SUPPORTED_SCHEMES, (
"You should specify a list of SUPPORTED_SCHEMES when creating "
"a filesystem"
)
default_fileio_registry.register(cls)
return cls
Filesystem (Filesystem)
Abstract Filesystem class.
Source code in zenml/io/filesystem.py
class Filesystem(BaseFileSystem, metaclass=FileSystemMeta):
"""Abstract Filesystem class."""
NotFoundError (OSError)
Auxiliary not found error
Source code in zenml/io/filesystem.py
class NotFoundError(IOError):
"""Auxiliary not found error"""
utils
create_tarfile(source_dir, output_filename='zipped.tar.gz', exclude_function=None)
Create a compressed representation of source_dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_dir |
str |
Path to source dir. |
required |
output_filename |
str |
Name of outputted gz. |
'zipped.tar.gz' |
exclude_function |
Optional[Callable[[tarfile.TarInfo], Union[tarfile.TarInfo, NoneType]]] |
Function that determines whether to exclude file. |
None |
Source code in zenml/io/utils.py
def create_tarfile(
source_dir: str,
output_filename: str = "zipped.tar.gz",
exclude_function: Optional[
Callable[[tarfile.TarInfo], Optional[tarfile.TarInfo]]
] = None,
) -> None:
"""Create a compressed representation of source_dir.
Args:
source_dir: Path to source dir.
output_filename: Name of outputted gz.
exclude_function: Function that determines whether to exclude file.
"""
if exclude_function is None:
# default is to exclude the .zenml directory
def exclude_function(
tarinfo: tarfile.TarInfo,
) -> Optional[tarfile.TarInfo]:
"""Exclude files from tar.
Args:
tarinfo: Any
Returns:
tarinfo required for exclude.
"""
filename = tarinfo.name
if ".zenml/" in filename or "venv/" in filename:
return None
else:
return tarinfo
with tarfile.open(output_filename, "w:gz") as tar:
tar.add(source_dir, arcname="", filter=exclude_function)
extract_tarfile(source_tar, output_dir)
Extracts all files in a compressed tar file to output_dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source_tar |
str |
Path to a tar compressed file. |
required |
output_dir |
str |
Directory where to extract. |
required |
Source code in zenml/io/utils.py
def extract_tarfile(source_tar: str, output_dir: str) -> None:
"""Extracts all files in a compressed tar file to output_dir.
Args:
source_tar: Path to a tar compressed file.
output_dir: Directory where to extract.
"""
if is_remote(source_tar):
raise NotImplementedError("Use local tars for now.")
with tarfile.open(source_tar, "r:gz") as tar:
tar.extractall(output_dir)
get_global_config_directory()
Returns the global config directory for ZenML.
Source code in zenml/io/utils.py
def get_global_config_directory() -> str:
"""Returns the global config directory for ZenML."""
return click.get_app_dir(APP_NAME)
is_gcs_path(path)
Returns True if path is on Google Cloud Storage.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
Any path as a string. |
required |
Returns:
Type | Description |
---|---|
bool |
True if gcs path, else False. |
Source code in zenml/io/utils.py
def is_gcs_path(path: str) -> bool:
"""Returns True if path is on Google Cloud Storage.
Args:
path: Any path as a string.
Returns:
True if gcs path, else False.
"""
return path.startswith("gs://")
read_file_contents_as_string(file_path)
Reads contents of file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to file. |
required |
Source code in zenml/io/utils.py
def read_file_contents_as_string(file_path: str) -> str:
"""Reads contents of file.
Args:
file_path: Path to file.
"""
if not file_exists(file_path):
raise FileNotFoundError(f"{file_path} does not exist!")
return open(file_path).read() # type: ignore[no-any-return]
write_file_contents_as_string(file_path, content)
Writes contents of file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to file. |
required |
content |
str |
Contents of file. |
required |
Source code in zenml/io/utils.py
def write_file_contents_as_string(file_path: str, content: str) -> None:
"""Writes contents of file.
Args:
file_path: Path to file.
content: Contents of file.
"""
with open(file_path, "w") as f:
f.write(content)