Skip to content

Artifact Stores

zenml.artifact_stores special

Artifact Stores

In ZenML, the inputs and outputs which go through any step is treated as an artifact and as its name suggests, an ArtifactStore is a place where these artifacts get stored.

Out of the box, ZenML comes with the BaseArtifactStore and LocalArtifactStore implementations. While the BaseArtifactStore establishes an interface for people who want to extend it to their needs, the LocalArtifactStore is a simple implementation for a local setup.

Moreover, additional artifact stores can be found in specific integrations modules, such as the GCPArtifactStore in the gcp integration and the AzureArtifactStore in the azure integration.

base_artifact_store

BaseArtifactStore (StackComponent, ABC) pydantic-model

Base class for all ZenML artifact stores.

Attributes:

Name Type Description
path str

The root path of the artifact store.

Source code in zenml/artifact_stores/base_artifact_store.py
class BaseArtifactStore(StackComponent, ABC):
    """Base class for all ZenML artifact stores.
    Attributes:
        path: The root path of the artifact store.
    """

    path: str

    # Class Configuration
    TYPE: ClassVar[StackComponentType] = StackComponentType.ARTIFACT_STORE
    FLAVOR: ClassVar[str]
    SUPPORTED_SCHEMES: ClassVar[Set[str]]

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initiate the Pydantic object and register the corresponding
        filesystem."""
        super(BaseArtifactStore, self).__init__(*args, **kwargs)
        self._register()

    def open(self, name: PathType, mode: str = "r") -> Any:
        """Open a file at the given path."""
        raise NotImplementedError()

    def copyfile(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file from the source to the destination."""
        raise NotImplementedError()

    def exists(self, path: PathType) -> bool:
        """Returns `True` if the given path exists."""
        raise NotImplementedError()

    def glob(self, pattern: PathType) -> List[PathType]:
        """Return the paths that match a glob pattern."""
        raise NotImplementedError()

    def isdir(self, path: PathType) -> bool:
        """Returns whether the given path points to a directory."""
        raise NotImplementedError()

    def listdir(self, path: PathType) -> List[PathType]:
        """Returns a list of files under a given directory in the filesystem."""
        raise NotImplementedError()

    def makedirs(self, path: PathType) -> None:
        """Make a directory at the given path, recursively creating parents."""
        raise NotImplementedError()

    def mkdir(self, path: PathType) -> None:
        """Make a directory at the given path; parent directory must exist."""
        raise NotImplementedError()

    def remove(self, path: PathType) -> None:
        """Remove the file at the given path. Dangerous operation."""
        raise NotImplementedError()

    def rename(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Rename source file to destination file."""
        raise NotImplementedError()

    def rmtree(self, path: PathType) -> None:
        """Deletes dir recursively. Dangerous operation."""
        raise NotImplementedError()

    def stat(self, path: PathType) -> Any:
        """Return the stat descriptor for a given file path."""
        raise NotImplementedError()

    def walk(
        self,
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory."""
        raise NotImplementedError()

    @root_validator
    def _ensure_artifact_store(cls, values: Dict[str, Any]) -> Any:
        """Validator function for the Artifact Stores. Checks whether
        supported schemes are defined and the given path is supported"""
        try:
            getattr(cls, "SUPPORTED_SCHEMES")
        except AttributeError:
            raise ArtifactStoreInterfaceError(
                textwrap.dedent(
                    """
                    When you are working with any classes which subclass from
                    zenml.artifact_store.BaseArtifactStore please make sure that your class
                    has a ClassVar named `SUPPORTED_SCHEMES` which should hold a set of
                    supported file schemes such as {"s3://"} or {"gcs://"}.
                    Example:
                    class S3ArtifactStore(StackComponent):
                        ...
                        # Class Variables
                        ...
                        SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"s3://"}
                        ...
                    """
                )
            )
        if not any(values["path"].startswith(i) for i in cls.SUPPORTED_SCHEMES):
            raise ArtifactStoreInterfaceError(
                textwrap.dedent(
                    f"""
                    The path: "{values["path"]}" you defined for your artifact
                    store is not supported by the implementation of
                    {cls.schema()["title"]}, because it does not start with
                    one of its supported schemes: {cls.SUPPORTED_SCHEMES}.
                    """
                )
            )

        return values

    def _register(self, priority: int = 5) -> None:
        """Create and register a filesystem within the TFX registry"""
        from tfx.dsl.io.filesystem import Filesystem
        from tfx.dsl.io.filesystem_registry import DEFAULT_FILESYSTEM_REGISTRY

        filesystem_class = type(
            self.__class__.__name__,
            (Filesystem,),
            {
                "SUPPORTED_SCHEMES": self.SUPPORTED_SCHEMES,
                "open": staticmethod(_catch_not_found_error(self.open)),
                "copy": staticmethod(_catch_not_found_error(self.copyfile)),
                "exists": staticmethod(self.exists),
                "glob": staticmethod(self.glob),
                "isdir": staticmethod(self.isdir),
                "listdir": staticmethod(_catch_not_found_error(self.listdir)),
                "makedirs": staticmethod(self.makedirs),
                "mkdir": staticmethod(_catch_not_found_error(self.mkdir)),
                "remove": staticmethod(_catch_not_found_error(self.remove)),
                "rename": staticmethod(_catch_not_found_error(self.rename)),
                "rmtree": staticmethod(_catch_not_found_error(self.rmtree)),
                "stat": staticmethod(_catch_not_found_error(self.stat)),
                "walk": staticmethod(_catch_not_found_error(self.walk)),
            },
        )

        DEFAULT_FILESYSTEM_REGISTRY.register(
            filesystem_class, priority=priority
        )
__init__(self, *args, **kwargs) special

Initiate the Pydantic object and register the corresponding filesystem.

Source code in zenml/artifact_stores/base_artifact_store.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initiate the Pydantic object and register the corresponding
    filesystem."""
    super(BaseArtifactStore, self).__init__(*args, **kwargs)
    self._register()
copyfile(self, src, dst, overwrite=False)

Copy a file from the source to the destination.

Source code in zenml/artifact_stores/base_artifact_store.py
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file from the source to the destination."""
    raise NotImplementedError()
exists(self, path)

Returns True if the given path exists.

Source code in zenml/artifact_stores/base_artifact_store.py
def exists(self, path: PathType) -> bool:
    """Returns `True` if the given path exists."""
    raise NotImplementedError()
glob(self, pattern)

Return the paths that match a glob pattern.

Source code in zenml/artifact_stores/base_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
    """Return the paths that match a glob pattern."""
    raise NotImplementedError()
isdir(self, path)

Returns whether the given path points to a directory.

Source code in zenml/artifact_stores/base_artifact_store.py
def isdir(self, path: PathType) -> bool:
    """Returns whether the given path points to a directory."""
    raise NotImplementedError()
listdir(self, path)

Returns a list of files under a given directory in the filesystem.

Source code in zenml/artifact_stores/base_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
    """Returns a list of files under a given directory in the filesystem."""
    raise NotImplementedError()
makedirs(self, path)

Make a directory at the given path, recursively creating parents.

Source code in zenml/artifact_stores/base_artifact_store.py
def makedirs(self, path: PathType) -> None:
    """Make a directory at the given path, recursively creating parents."""
    raise NotImplementedError()
mkdir(self, path)

Make a directory at the given path; parent directory must exist.

Source code in zenml/artifact_stores/base_artifact_store.py
def mkdir(self, path: PathType) -> None:
    """Make a directory at the given path; parent directory must exist."""
    raise NotImplementedError()
open(self, name, mode='r')

Open a file at the given path.

Source code in zenml/artifact_stores/base_artifact_store.py
def open(self, name: PathType, mode: str = "r") -> Any:
    """Open a file at the given path."""
    raise NotImplementedError()
remove(self, path)

Remove the file at the given path. Dangerous operation.

Source code in zenml/artifact_stores/base_artifact_store.py
def remove(self, path: PathType) -> None:
    """Remove the file at the given path. Dangerous operation."""
    raise NotImplementedError()
rename(self, src, dst, overwrite=False)

Rename source file to destination file.

Source code in zenml/artifact_stores/base_artifact_store.py
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file."""
    raise NotImplementedError()
rmtree(self, path)

Deletes dir recursively. Dangerous operation.

Source code in zenml/artifact_stores/base_artifact_store.py
def rmtree(self, path: PathType) -> None:
    """Deletes dir recursively. Dangerous operation."""
    raise NotImplementedError()
stat(self, path)

Return the stat descriptor for a given file path.

Source code in zenml/artifact_stores/base_artifact_store.py
def stat(self, path: PathType) -> Any:
    """Return the stat descriptor for a given file path."""
    raise NotImplementedError()
walk(self, top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Source code in zenml/artifact_stores/base_artifact_store.py
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory."""
    raise NotImplementedError()

local_artifact_store

LocalArtifactStore (BaseArtifactStore) pydantic-model

Artifact Store for local artifacts.

Source code in zenml/artifact_stores/local_artifact_store.py
class LocalArtifactStore(BaseArtifactStore):
    """Artifact Store for local artifacts."""

    # Class Configuration
    FLAVOR: ClassVar[str] = "local"
    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {""}

    @property
    def local_path(self) -> str:
        """Path to the local directory where the artifacts are stored."""
        return self.path

    @staticmethod
    def open(name: PathType, mode: str = "r") -> Any:
        """Open a file at the given path."""
        return open(name, mode=mode)

    @staticmethod
    def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
        """Copy a file from the source to the destination."""
        if not overwrite and os.path.exists(dst):
            raise FileExistsError(
                f"Destination file {str(dst)} already exists and argument "
                f"`overwrite` is false."
            )
        shutil.copyfile(src, dst)  # type: ignore[type-var, arg-type]

    @staticmethod
    def exists(path: PathType) -> bool:
        """Returns `True` if the given path exists."""
        return os.path.exists(path)

    @staticmethod
    def glob(pattern: PathType) -> List[PathType]:
        """Return the paths that match a glob pattern."""
        return glob.glob(pattern)  # type: ignore[type-var]

    @staticmethod
    def isdir(path: PathType) -> bool:
        """Returns whether the given path points to a directory."""
        return os.path.isdir(path)

    @staticmethod
    def listdir(path: PathType) -> List[PathType]:
        """Returns a list of files under a given directory in the filesystem."""
        return os.listdir(path)  # type:ignore[return-value]

    @staticmethod
    def makedirs(path: PathType) -> None:
        """Make a directory at the given path, recursively creating parents."""
        os.makedirs(path, exist_ok=True)

    @staticmethod
    def mkdir(path: PathType) -> None:
        """Make a directory at the given path; parent directory must exist."""
        os.mkdir(path)

    @staticmethod
    def remove(path: PathType) -> None:
        """Remove the file at the given path. Dangerous operation."""
        os.remove(path)

    @staticmethod
    def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
        """Rename source file to destination file.
        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True`
        """
        if not overwrite and os.path.exists(dst):
            raise FileExistsError(
                f"Destination path {str(dst)} already exists and argument "
                f"`overwrite` is false."
            )
        os.rename(src, dst)

    @staticmethod
    def rmtree(path: PathType) -> None:
        """Deletes dir recursively. Dangerous operation."""
        shutil.rmtree(path)

    @staticmethod
    def stat(path: PathType) -> Any:
        """Return the stat descriptor for a given file path."""
        return os.stat(path)

    @staticmethod
    def walk(
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.
        Args:
            top: Path of directory to walk.
            topdown: Whether to walk directories topdown or bottom-up.
            onerror: Callable that gets called if an error occurs.
        Returns:
            An Iterable of Tuples, each of which contain the path of the
            current directory path, a list of directories inside the
            current directory and a list of files inside the current
            directory.
        """
        yield from os.walk(top, topdown=topdown, onerror=onerror)  # type: ignore[type-var, misc]

    @validator("path")
    def ensure_path_local(cls, path: str) -> str:
        remote_prefixes = ["gs://", "hdfs://", "s3://", "az://", "abfs://"]
        if any(path.startswith(prefix) for prefix in remote_prefixes):
            raise ArtifactStoreInterfaceError(
                f"The path:{path} you defined for your local artifact store "
                f"start with one of the remote prefixes."
            )
        return path
local_path: str property readonly

Path to the local directory where the artifacts are stored.

copyfile(src, dst, overwrite=False) staticmethod

Copy a file from the source to the destination.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def copyfile(src: PathType, dst: PathType, overwrite: bool = False) -> None:
    """Copy a file from the source to the destination."""
    if not overwrite and os.path.exists(dst):
        raise FileExistsError(
            f"Destination file {str(dst)} already exists and argument "
            f"`overwrite` is false."
        )
    shutil.copyfile(src, dst)  # type: ignore[type-var, arg-type]
exists(path) staticmethod

Returns True if the given path exists.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def exists(path: PathType) -> bool:
    """Returns `True` if the given path exists."""
    return os.path.exists(path)
glob(pattern) staticmethod

Return the paths that match a glob pattern.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def glob(pattern: PathType) -> List[PathType]:
    """Return the paths that match a glob pattern."""
    return glob.glob(pattern)  # type: ignore[type-var]
isdir(path) staticmethod

Returns whether the given path points to a directory.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def isdir(path: PathType) -> bool:
    """Returns whether the given path points to a directory."""
    return os.path.isdir(path)
listdir(path) staticmethod

Returns a list of files under a given directory in the filesystem.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def listdir(path: PathType) -> List[PathType]:
    """Returns a list of files under a given directory in the filesystem."""
    return os.listdir(path)  # type:ignore[return-value]
makedirs(path) staticmethod

Make a directory at the given path, recursively creating parents.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def makedirs(path: PathType) -> None:
    """Make a directory at the given path, recursively creating parents."""
    os.makedirs(path, exist_ok=True)
mkdir(path) staticmethod

Make a directory at the given path; parent directory must exist.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def mkdir(path: PathType) -> None:
    """Make a directory at the given path; parent directory must exist."""
    os.mkdir(path)
open(name, mode='r') staticmethod

Open a file at the given path.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def open(name: PathType, mode: str = "r") -> Any:
    """Open a file at the given path."""
    return open(name, mode=mode)
remove(path) staticmethod

Remove the file at the given path. Dangerous operation.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def remove(path: PathType) -> None:
    """Remove the file at the given path. Dangerous operation."""
    os.remove(path)
rename(src, dst, overwrite=False) staticmethod

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True

False
Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rename(src: PathType, dst: PathType, overwrite: bool = False) -> None:
    """Rename source file to destination file.
    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True`
    """
    if not overwrite and os.path.exists(dst):
        raise FileExistsError(
            f"Destination path {str(dst)} already exists and argument "
            f"`overwrite` is false."
        )
    os.rename(src, dst)
rmtree(path) staticmethod

Deletes dir recursively. Dangerous operation.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def rmtree(path: PathType) -> None:
    """Deletes dir recursively. Dangerous operation."""
    shutil.rmtree(path)
stat(path) staticmethod

Return the stat descriptor for a given file path.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def stat(path: PathType) -> Any:
    """Return the stat descriptor for a given file path."""
    return os.stat(path)
walk(top, topdown=True, onerror=None) staticmethod

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Whether to walk directories topdown or bottom-up.

True
onerror Optional[Callable[..., NoneType]]

Callable that gets called if an error occurs.

None

Returns:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/artifact_stores/local_artifact_store.py
@staticmethod
def walk(
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.
    Args:
        top: Path of directory to walk.
        topdown: Whether to walk directories topdown or bottom-up.
        onerror: Callable that gets called if an error occurs.
    Returns:
        An Iterable of Tuples, each of which contain the path of the
        current directory path, a list of directories inside the
        current directory and a list of files inside the current
        directory.
    """
    yield from os.walk(top, topdown=topdown, onerror=onerror)  # type: ignore[type-var, misc]