Skip to content

Artifact Stores

zenml.artifact_stores special

In ZenML, the inputs and outputs which go through any step is treated as an artifact and as its name suggests, an ArtifactStore is a place where these artifacts get stored.

Out of the box, ZenML comes with the BaseArtifactStore and LocalArtifactStore implementations. While the BaseArtifactStore establishes an interface for people who want to extend it to their needs, the LocalArtifactStore is a simple implementation for a local setup.

Moreover, additional artifact stores can be found in specific integrations modules, such as the GCPArtifactStore in the gcp integration and the AzureArtifactStore in the azure integration.

base_artifact_store

BaseArtifactStore (StackComponent) pydantic-model

Base class for all ZenML artifact stores.

Attributes:

Name Type Description
path str

The root path of the artifact store.

Source code in zenml/artifact_stores/base_artifact_store.py
class BaseArtifactStore(StackComponent):
    """Base class for all ZenML artifact stores.
    Attributes:
        path: The root path of the artifact store.
    """

    path: str

    # Class Configuration
    TYPE: ClassVar[StackComponentType] = StackComponentType.ARTIFACT_STORE
    SUPPORTED_SCHEMES: ClassVar[Set[str]]

    # --- User interface ---
    @abstractmethod
    def open(self, name: PathType, mode: str = "r") -> Any:
        """Open a file at the given path."""

    @abstractmethod
    def copyfile(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file from the source to the destination."""

    @abstractmethod
    def exists(self, path: PathType) -> bool:
        """Returns `True` if the given path exists."""

    @abstractmethod
    def glob(self, pattern: PathType) -> List[PathType]:
        """Return the paths that match a glob pattern."""

    @abstractmethod
    def isdir(self, path: PathType) -> bool:
        """Returns whether the given path points to a directory."""

    @abstractmethod
    def listdir(self, path: PathType) -> List[PathType]:
        """Returns a list of files under a given directory in the filesystem."""

    @abstractmethod
    def makedirs(self, path: PathType) -> None:
        """Make a directory at the given path, recursively creating parents."""

    @abstractmethod
    def mkdir(self, path: PathType) -> None:
        """Make a directory at the given path; parent directory must exist."""

    @abstractmethod
    def remove(self, path: PathType) -> None:
        """Remove the file at the given path. Dangerous operation."""

    @abstractmethod
    def rename(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Rename source file to destination file."""

    @abstractmethod
    def rmtree(self, path: PathType) -> None:
        """Deletes dir recursively. Dangerous operation."""

    @abstractmethod
    def stat(self, path: PathType) -> Any:
        """Return the stat descriptor for a given file path."""

    @abstractmethod
    def walk(
        self,
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory."""

    # --- Internal interface ---
    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initiate the Pydantic object and register the corresponding
        filesystem."""
        super(BaseArtifactStore, self).__init__(*args, **kwargs)
        self._register()

    @root_validator(skip_on_failure=True)
    def _ensure_artifact_store(cls, values: Dict[str, Any]) -> Any:
        """Validator function for the Artifact Stores. Checks whether
        supported schemes are defined and the given path is supported"""
        try:
            getattr(cls, "SUPPORTED_SCHEMES")
        except AttributeError:
            raise ArtifactStoreInterfaceError(
                "When you are working with any classes which subclass from "
                "'zenml.artifact_store.BaseArtifactStore' please make sure "
                "that your class has a ClassVar named `SUPPORTED_SCHEMES` "
                "which should hold a set of supported file schemes such "
                "as {'s3://'} or {'gcs://'}. \n"
                + textwrap.dedent(
                    """
                    When you are working with any classes which subclass from
                    zenml.artifact_store.BaseArtifactStore please make sure
                    that your class has a ClassVar named `SUPPORTED_SCHEMES`
                    which should hold a set of supported file schemes such
                    as {"s3://"} or {"gcs://"}.

                    Example:

                    class S3ArtifactStore(StackComponent):
                        ...
                        # Class Variables
                        SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"s3://"}
                        ...
                    """
                )
            )
        if not any(values["path"].startswith(i) for i in cls.SUPPORTED_SCHEMES):
            raise ArtifactStoreInterfaceError(
                f"The path: '{values['path']}' you defined for your "
                f"artifact store is not supported by the implementation of "
                f"{cls.schema()['title']}, because it does not start with "
                f"one of its supported schemes: {cls.SUPPORTED_SCHEMES}."
            )

        return values

    def _register(self, priority: int = 5) -> None:
        """Create and register a filesystem within the TFX registry"""
        from tfx.dsl.io.filesystem import Filesystem
        from tfx.dsl.io.filesystem_registry import DEFAULT_FILESYSTEM_REGISTRY

        filesystem_class = type(
            self.__class__.__name__,
            (Filesystem,),
            {
                "SUPPORTED_SCHEMES": self.SUPPORTED_SCHEMES,
                "open": staticmethod(_catch_not_found_error(self.open)),
                "copy": staticmethod(_catch_not_found_error(self.copyfile)),
                "exists": staticmethod(self.exists),
                "glob": staticmethod(self.glob),
                "isdir": staticmethod(self.isdir),
                "listdir": staticmethod(_catch_not_found_error(self.listdir)),
                "makedirs": staticmethod(self.makedirs),
                "mkdir": staticmethod(_catch_not_found_error(self.mkdir)),
                "remove": staticmethod(_catch_not_found_error(self.remove)),
                "rename": staticmethod(_catch_not_found_error(self.rename)),
                "rmtree": staticmethod(_catch_not_found_error(self.rmtree)),
                "stat": staticmethod(_catch_not_found_error(self.stat)),
                "walk": staticmethod(_catch_not_found_error(self.walk)),
            },
        )

        DEFAULT_FILESYSTEM_REGISTRY.register(
            filesystem_class, priority=priority
        )
__init__(self, *args, **kwargs) special

Initiate the Pydantic object and register the corresponding filesystem.

Source code in zenml/artifact_stores/base_artifact_store.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initiate the Pydantic object and register the corresponding
    filesystem."""
    super(BaseArtifactStore, self).__init__(*args, **kwargs)
    self._register()
copyfile(self, src, dst, overwrite=False)

Copy a file from the source to the destination.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file from the source to the destination."""
exists(self, path)

Returns True if the given path exists.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def exists(self, path: PathType) -> bool:
    """Returns `True` if the given path exists."""
glob(self, pattern)

Return the paths that match a glob pattern.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def glob(self, pattern: PathType) -> List[PathType]:
    """Return the paths that match a glob pattern."""
isdir(self, path)

Returns whether the given path points to a directory.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def isdir(self, path: PathType) -> bool:
    """Returns whether the given path points to a directory."""
listdir(self, path)

Returns a list of files under a given directory in the filesystem.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def listdir(self, path: PathType) -> List[PathType]:
    """Returns a list of files under a given directory in the filesystem."""
makedirs(self, path)

Make a directory at the given path, recursively creating parents.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def makedirs(self, path: PathType) -> None:
    """Make a directory at the given path, recursively creating parents."""
mkdir(self, path)

Make a directory at the given path; parent directory must exist.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def mkdir(self, path: PathType) -> None:
    """Make a directory at the given path; parent directory must exist."""
open(self, name, mode='r')

Open a file at the given path.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def open(self, name: PathType, mode: str = "r") -> Any:
    """Open a file at the given path."""
remove(self, path)

Remove the file at the given path. Dangerous operation.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def remove(self, path: PathType) -> None:
    """Remove the file at the given path. Dangerous operation."""
rename(self, src, dst, overwrite=False)

Rename source file to destination file.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file."""
rmtree(self, path)

Deletes dir recursively. Dangerous operation.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def rmtree(self, path: PathType) -> None:
    """Deletes dir recursively. Dangerous operation."""
stat(self, path)

Return the stat descriptor for a given file path.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def stat(self, path: PathType) -> Any:
    """Return the stat descriptor for a given file path."""
walk(self, top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Source code in zenml/artifact_stores/base_artifact_store.py
@abstractmethod
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory."""

local_artifact_store

LocalArtifactStore (BaseArtifactStore) pydantic-model

Artifact Store for local artifacts.

Source code in zenml/artifact_stores/local_artifact_store.py
class LocalArtifactStore(BaseArtifactStore):
    """Artifact Store for local artifacts."""

    # Class Configuration
    FLAVOR: ClassVar[str] = "local"
    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {""}

    @property
    def local_path(self) -> str:
        """Path to the local directory where the artifacts are stored."""
        return self.path

    @staticmethod
    def open(name: PathType, mode: str = "r") -> Any:
        """Open a file at the given path."""
        return open(name, mode=mode)

    @staticmethod
    def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
        """Copy a file from the source to the destination."""
        if not overwrite and os.path.exists(dst):
            raise FileExistsError(
                f"Destination file {str(dst)} already exists and argument "
                f"`overwrite` is false."
            )
        shutil.copyfile(src, dst)  # type: ignore[type-var, arg-type]

    @staticmethod
    def exists(path: PathType) -> bool:
        """Returns `True` if the given path exists."""
        return os.path.exists(path)

    @staticmethod
    def glob(pattern: PathType) -> List[PathType]:
        """Return the paths that match a glob pattern."""
        return glob.glob(pattern)  # type: ignore[type-var]

    @staticmethod
    def isdir(path: PathType) -> bool:
        """Returns whether the given path points to a directory."""
        return os.path.isdir(path)

    @staticmethod
    def listdir(path: PathType) -> List[PathType]:
        """Returns a list of files under a given directory in the filesystem."""
        return os.listdir(path)  # type:ignore[return-value]

    @staticmethod
    def makedirs(path: PathType) -> None:
        """Make a directory at the given path, recursively creating parents."""
        os.makedirs(path, exist_ok=True)

    @staticmethod
    def mkdir(path: PathType) -> None:
        """Make a directory at the given path; parent directory must exist."""
        os.mkdir(path)

    @staticmethod
    def remove(path: PathType) -> None:
        """Remove the file at the given path. Dangerous operation."""
        os.remove(path)

    @staticmethod
    def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
        """Rename source file to destination file.
        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True`
        """
        if not overwrite and os.path.exists(dst):
            raise FileExistsError(
                f"Destination path {str(dst)} already exists and argument "
                f"`overwrite` is false."
            )
        os.rename(src, dst)

    @staticmethod
    def rmtree(path: PathType) -> None:
        """Deletes dir recursively. Dangerous operation."""
        shutil.rmtree(path)

    @staticmethod
    def stat(path: PathType) -> Any:
        """Return the stat descriptor for a given file path."""
        return os.stat(path)

    @staticmethod
    def walk(
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.
        Args:
            top: Path of directory to walk.
            topdown: Whether to walk directories topdown or bottom-up.
            onerror: Callable that gets called if an error occurs.
        Returns:
            An Iterable of Tuples, each of which contain the path of the
            current directory path, a list of directories inside the
            current directory and a list of files inside the current
            directory.
        """
        yield from os.walk(top, topdown=topdown, onerror=onerror)  # type: ignore[type-var, misc]

    @validator("path")
    def ensure_path_local(cls, path: str) -> str:
        """Pydantic validator which ensures that the given path is a local
        path"""
        remote_prefixes = ["gs://", "hdfs://", "s3://", "az://", "abfs://"]
        if any(path.startswith(prefix) for prefix in remote_prefixes):
            raise ArtifactStoreInterfaceError(
                f"The path:{path} you defined for your local artifact store "
                f"start with one of the remote prefixes."
            )
        return path
local_path: str property readonly

Path to the local directory where the artifacts are stored.

copyfile(src, dst, overwrite=False) staticmethod

Copy a file from the source to the destination.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
    """Copy a file from the source to the destination."""
    if not overwrite and os.path.exists(dst):
        raise FileExistsError(
            f"Destination file {str(dst)} already exists and argument "
            f"`overwrite` is false."
        )
    shutil.copyfile(src, dst)  # type: ignore[type-var, arg-type]
ensure_path_local(path) classmethod

Pydantic validator which ensures that the given path is a local path

Source code in zenml/artifact_stores/local_artifact_store.py
@validator("path")
def ensure_path_local(cls, path: str) -> str:
    """Pydantic validator which ensures that the given path is a local
    path"""
    remote_prefixes = ["gs://", "hdfs://", "s3://", "az://", "abfs://"]
    if any(path.startswith(prefix) for prefix in remote_prefixes):
        raise ArtifactStoreInterfaceError(
            f"The path:{path} you defined for your local artifact store "
            f"start with one of the remote prefixes."
        )
    return path
exists(path) staticmethod

Returns True if the given path exists.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def exists(path: PathType) -> bool:
    """Returns `True` if the given path exists."""
    return os.path.exists(path)
glob(pattern) staticmethod

Return the paths that match a glob pattern.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
    """Return the paths that match a glob pattern."""
    return glob.glob(pattern)  # type: ignore[type-var]
isdir(path) staticmethod

Returns whether the given path points to a directory.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def isdir(path: PathType) -> bool:
    """Returns whether the given path points to a directory."""
    return os.path.isdir(path)
listdir(path) staticmethod

Returns a list of files under a given directory in the filesystem.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def listdir(path: PathType) -> List[PathType]:
    """Returns a list of files under a given directory in the filesystem."""
    return os.listdir(path)  # type:ignore[return-value]
makedirs(path) staticmethod

Make a directory at the given path, recursively creating parents.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def makedirs(path: PathType) -> None:
    """Make a directory at the given path, recursively creating parents."""
    os.makedirs(path, exist_ok=True)
mkdir(path) staticmethod

Make a directory at the given path; parent directory must exist.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def mkdir(path: PathType) -> None:
    """Make a directory at the given path; parent directory must exist."""
    os.mkdir(path)
open(name, mode='r') staticmethod

Open a file at the given path.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
    """Open a file at the given path."""
    return open(name, mode=mode)
remove(path) staticmethod

Remove the file at the given path. Dangerous operation.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def remove(path: PathType) -> None:
    """Remove the file at the given path. Dangerous operation."""
    os.remove(path)
rename(src, dst, overwrite=False) staticmethod

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True

False
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
    """Rename source file to destination file.
    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True`
    """
    if not overwrite and os.path.exists(dst):
        raise FileExistsError(
            f"Destination path {str(dst)} already exists and argument "
            f"`overwrite` is false."
        )
    os.rename(src, dst)
rmtree(path) staticmethod

Deletes dir recursively. Dangerous operation.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rmtree(path: PathType) -> None:
    """Deletes dir recursively. Dangerous operation."""
    shutil.rmtree(path)
stat(path) staticmethod

Return the stat descriptor for a given file path.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def stat(path: PathType) -> Any:
    """Return the stat descriptor for a given file path."""
    return os.stat(path)
walk(top, topdown=True, onerror=None) staticmethod

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Whether to walk directories topdown or bottom-up.

True
onerror Optional[Callable[..., NoneType]]

Callable that gets called if an error occurs.

None

Returns:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def walk(
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.
    Args:
        top: Path of directory to walk.
        topdown: Whether to walk directories topdown or bottom-up.
        onerror: Callable that gets called if an error occurs.
    Returns:
        An Iterable of Tuples, each of which contain the path of the
        current directory path, a list of directories inside the
        current directory and a list of files inside the current
        directory.
    """
    yield from os.walk(top, topdown=topdown, onerror=onerror)  # type: ignore[type-var, misc]