Artifact Stores
zenml.artifact_stores
special
Artifact Stores
In ZenML, the inputs and outputs which go through any step is treated as an
artifact and as its name suggests, an ArtifactStore
is a place where these
artifacts get stored.
Out of the box, ZenML comes with the BaseArtifactStore
and
LocalArtifactStore
implementations. While the BaseArtifactStore
establishes
an interface for people who want to extend it to their needs, the
LocalArtifactStore
is a simple implementation for a local setup.
Moreover, additional artifact stores can be found in specific integrations
modules, such as the GCPArtifactStore
in the gcp
integration and the
AzureArtifactStore
in the azure
integration.
base_artifact_store
BaseArtifactStore (StackComponent, ABC)
pydantic-model
Base class for all ZenML artifact stores.
Attributes:
Name | Type | Description |
---|---|---|
path |
str |
The root path of the artifact store. |
Source code in zenml/artifact_stores/base_artifact_store.py
class BaseArtifactStore(StackComponent, ABC):
"""Base class for all ZenML artifact stores.
Attributes:
path: The root path of the artifact store.
"""
path: str
# Class Configuration
TYPE: ClassVar[StackComponentType] = StackComponentType.ARTIFACT_STORE
FLAVOR: ClassVar[str]
SUPPORTED_SCHEMES: ClassVar[Set[str]]
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initiate the Pydantic object and register the corresponding
filesystem."""
super(BaseArtifactStore, self).__init__(*args, **kwargs)
self._register()
def open(self, name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
raise NotImplementedError()
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination."""
raise NotImplementedError()
def exists(self, path: PathType) -> bool:
"""Returns `True` if the given path exists."""
raise NotImplementedError()
def glob(self, pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
raise NotImplementedError()
def isdir(self, path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
raise NotImplementedError()
def listdir(self, path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
raise NotImplementedError()
def makedirs(self, path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
raise NotImplementedError()
def mkdir(self, path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
raise NotImplementedError()
def remove(self, path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
raise NotImplementedError()
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file."""
raise NotImplementedError()
def rmtree(self, path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
raise NotImplementedError()
def stat(self, path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
raise NotImplementedError()
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory."""
raise NotImplementedError()
@root_validator
def _ensure_artifact_store(cls, values: Dict[str, Any]) -> Any:
"""Validator function for the Artifact Stores. Checks whether
supported schemes are defined and the given path is supported"""
try:
getattr(cls, "SUPPORTED_SCHEMES")
except AttributeError:
raise ArtifactStoreInterfaceError(
textwrap.dedent(
"""
When you are working with any classes which subclass from
zenml.artifact_store.BaseArtifactStore please make sure that your class
has a ClassVar named `SUPPORTED_SCHEMES` which should hold a set of
supported file schemes such as {"s3://"} or {"gcs://"}.
Example:
class S3ArtifactStore(StackComponent):
...
# Class Variables
...
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"s3://"}
...
"""
)
)
if not any(values["path"].startswith(i) for i in cls.SUPPORTED_SCHEMES):
raise ArtifactStoreInterfaceError(
textwrap.dedent(
f"""
The path: "{values["path"]}" you defined for your artifact
store is not supported by the implementation of
{cls.schema()["title"]}, because it does not start with
one of its supported schemes: {cls.SUPPORTED_SCHEMES}.
"""
)
)
return values
def _register(self, priority: int = 5) -> None:
"""Create and register a filesystem within the TFX registry"""
from tfx.dsl.io.filesystem import Filesystem
from tfx.dsl.io.filesystem_registry import DEFAULT_FILESYSTEM_REGISTRY
filesystem_class = type(
self.__class__.__name__,
(Filesystem,),
{
"SUPPORTED_SCHEMES": self.SUPPORTED_SCHEMES,
"open": staticmethod(_catch_not_found_error(self.open)),
"copy": staticmethod(_catch_not_found_error(self.copyfile)),
"exists": staticmethod(self.exists),
"glob": staticmethod(self.glob),
"isdir": staticmethod(self.isdir),
"listdir": staticmethod(_catch_not_found_error(self.listdir)),
"makedirs": staticmethod(self.makedirs),
"mkdir": staticmethod(_catch_not_found_error(self.mkdir)),
"remove": staticmethod(_catch_not_found_error(self.remove)),
"rename": staticmethod(_catch_not_found_error(self.rename)),
"rmtree": staticmethod(_catch_not_found_error(self.rmtree)),
"stat": staticmethod(_catch_not_found_error(self.stat)),
"walk": staticmethod(_catch_not_found_error(self.walk)),
},
)
DEFAULT_FILESYSTEM_REGISTRY.register(
filesystem_class, priority=priority
)
__init__(self, *args, **kwargs)
special
Initiate the Pydantic object and register the corresponding filesystem.
Source code in zenml/artifact_stores/base_artifact_store.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initiate the Pydantic object and register the corresponding
filesystem."""
super(BaseArtifactStore, self).__init__(*args, **kwargs)
self._register()
copyfile(self, src, dst, overwrite=False)
Copy a file from the source to the destination.
Source code in zenml/artifact_stores/base_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file from the source to the destination."""
raise NotImplementedError()
exists(self, path)
Returns True
if the given path exists.
Source code in zenml/artifact_stores/base_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Returns `True` if the given path exists."""
raise NotImplementedError()
glob(self, pattern)
Return the paths that match a glob pattern.
Source code in zenml/artifact_stores/base_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
raise NotImplementedError()
isdir(self, path)
Returns whether the given path points to a directory.
Source code in zenml/artifact_stores/base_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
raise NotImplementedError()
listdir(self, path)
Returns a list of files under a given directory in the filesystem.
Source code in zenml/artifact_stores/base_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
raise NotImplementedError()
makedirs(self, path)
Make a directory at the given path, recursively creating parents.
Source code in zenml/artifact_stores/base_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
raise NotImplementedError()
mkdir(self, path)
Make a directory at the given path; parent directory must exist.
Source code in zenml/artifact_stores/base_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
raise NotImplementedError()
open(self, name, mode='r')
Open a file at the given path.
Source code in zenml/artifact_stores/base_artifact_store.py
def open(self, name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
raise NotImplementedError()
remove(self, path)
Remove the file at the given path. Dangerous operation.
Source code in zenml/artifact_stores/base_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
raise NotImplementedError()
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Source code in zenml/artifact_stores/base_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file."""
raise NotImplementedError()
rmtree(self, path)
Deletes dir recursively. Dangerous operation.
Source code in zenml/artifact_stores/base_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
raise NotImplementedError()
stat(self, path)
Return the stat descriptor for a given file path.
Source code in zenml/artifact_stores/base_artifact_store.py
def stat(self, path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
raise NotImplementedError()
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Source code in zenml/artifact_stores/base_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory."""
raise NotImplementedError()
local_artifact_store
LocalArtifactStore (BaseArtifactStore)
pydantic-model
Artifact Store for local artifacts.
Source code in zenml/artifact_stores/local_artifact_store.py
class LocalArtifactStore(BaseArtifactStore):
"""Artifact Store for local artifacts."""
# Class Configuration
FLAVOR: ClassVar[str] = "local"
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {""}
@property
def local_path(self) -> str:
"""Path to the local directory where the artifacts are stored."""
return self.path
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
return open(name, mode=mode)
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst) # type: ignore[type-var, arg-type]
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists."""
return os.path.exists(path)
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
return glob.glob(pattern) # type: ignore[type-var]
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
return os.path.isdir(path)
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
return os.listdir(path) # type:ignore[return-value]
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
os.makedirs(path, exist_ok=True)
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
os.mkdir(path)
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
os.remove(path)
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
shutil.rmtree(path)
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
return os.stat(path)
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk(top, topdown=topdown, onerror=onerror) # type: ignore[type-var, misc]
@validator("path")
def ensure_path_local(cls, path: str) -> str:
remote_prefixes = ["gs://", "hdfs://", "s3://", "az://", "abfs://"]
if any(path.startswith(prefix) for prefix in remote_prefixes):
raise ArtifactStoreInterfaceError(
f"The path:{path} you defined for your local artifact store "
f"start with one of the remote prefixes."
)
return path
local_path: str
property
readonly
Path to the local directory where the artifacts are stored.
copyfile(src, dst, overwrite=False)
staticmethod
Copy a file from the source to the destination.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Copy a file from the source to the destination."""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination file {str(dst)} already exists and argument "
f"`overwrite` is false."
)
shutil.copyfile(src, dst) # type: ignore[type-var, arg-type]
exists(path)
staticmethod
Returns True
if the given path exists.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def exists(path: PathType) -> bool:
"""Returns `True` if the given path exists."""
return os.path.exists(path)
glob(pattern)
staticmethod
Return the paths that match a glob pattern.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
"""Return the paths that match a glob pattern."""
return glob.glob(pattern) # type: ignore[type-var]
isdir(path)
staticmethod
Returns whether the given path points to a directory.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def isdir(path: PathType) -> bool:
"""Returns whether the given path points to a directory."""
return os.path.isdir(path)
listdir(path)
staticmethod
Returns a list of files under a given directory in the filesystem.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def listdir(path: PathType) -> List[PathType]:
"""Returns a list of files under a given directory in the filesystem."""
return os.listdir(path) # type:ignore[return-value]
makedirs(path)
staticmethod
Make a directory at the given path, recursively creating parents.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def makedirs(path: PathType) -> None:
"""Make a directory at the given path, recursively creating parents."""
os.makedirs(path, exist_ok=True)
mkdir(path)
staticmethod
Make a directory at the given path; parent directory must exist.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def mkdir(path: PathType) -> None:
"""Make a directory at the given path; parent directory must exist."""
os.mkdir(path)
open(name, mode='r')
staticmethod
Open a file at the given path.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
"""Open a file at the given path."""
return open(name, mode=mode)
remove(path)
staticmethod
Remove the file at the given path. Dangerous operation.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def remove(path: PathType) -> None:
"""Remove the file at the given path. Dangerous operation."""
os.remove(path)
rename(src, dst, overwrite=False)
staticmethod
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True`
"""
if not overwrite and os.path.exists(dst):
raise FileExistsError(
f"Destination path {str(dst)} already exists and argument "
f"`overwrite` is false."
)
os.rename(src, dst)
rmtree(path)
staticmethod
Deletes dir recursively. Dangerous operation.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rmtree(path: PathType) -> None:
"""Deletes dir recursively. Dangerous operation."""
shutil.rmtree(path)
stat(path)
staticmethod
Return the stat descriptor for a given file path.
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def stat(path: PathType) -> Any:
"""Return the stat descriptor for a given file path."""
return os.stat(path)
walk(top, topdown=True, onerror=None)
staticmethod
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Whether to walk directories topdown or bottom-up. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Callable that gets called if an error occurs. |
None |
Returns:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def walk(
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Whether to walk directories topdown or bottom-up.
onerror: Callable that gets called if an error occurs.
Returns:
An Iterable of Tuples, each of which contain the path of the
current directory path, a list of directories inside the
current directory and a list of files inside the current
directory.
"""
yield from os.walk(top, topdown=topdown, onerror=onerror) # type: ignore[type-var, misc]