Skip to content

Materializers

zenml.materializers special

Initialization of ZenML materializers.

Materializers are used to convert a ZenML artifact into a specific format. They are most often used to handle the input or output of ZenML steps, and can be extended by building on the BaseMaterializer class.

base_materializer

Metaclass implementation for registering ZenML BaseMaterializer subclasses.

BaseMaterializer

Base Materializer to realize artifact data.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializer(metaclass=BaseMaterializerMeta):
    """Base Materializer to realize artifact data."""

    ASSOCIATED_ARTIFACT_TYPE: ClassVar[ArtifactType] = ArtifactType.BASE
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = ()

    # Deprecated; will be removed in a future release.
    ASSOCIATED_ARTIFACT_TYPES: ClassVar[Tuple[Type["BaseArtifact"], ...]] = ()

    def __init__(
        self,
        uri: Optional[str] = None,
        artifact: Optional["BaseArtifact"] = None,
    ):
        """Initializes a materializer with the given URI.

        Args:
            uri: The URI where the artifact data is stored.
            artifact: Deprecated; will be removed in a future release.

        Raises:
            ValueError: If neither a URI nor an artifact is provided.
        """
        if isinstance(uri, BaseArtifact):
            artifact = uri
        if artifact is not None:
            logger.warning(
                "Initializing a materializer with an artifact is deprecated "
                "and will be removed in a future release. Please initialize "
                "the materializer with the artifact's URI instead via "
                "`materializer = MyMaterializer(uri=artifact.uri)`."
            )
            self.uri = artifact.uri
        elif uri is not None:
            self.uri = uri
        else:
            raise ValueError(
                "Initializing a materializer requires either a URI or an "
                "artifact."
            )
        self.artifact = DeprecatedArtifact(self.uri)

    def _can_handle_type(self, data_type: Type[Any]) -> bool:
        """Whether the materializer can read/write a certain type.

        Args:
            data_type: The type to check.

        Returns:
            Whether the materializer can read/write the given type.
        """
        return any(
            issubclass(data_type, associated_type)
            for associated_type in self.ASSOCIATED_TYPES
        )

    def load(self, data_type: Type[Any]) -> Any:
        """Write logic here to load the data of an artifact.

        Args:
            data_type: What type the artifact data should be loaded as.

        Returns:
            The data of the artifact.

        Raises:
            TypeError: If the artifact data is not of the correct type.
        """
        if not self._can_handle_type(data_type):
            raise TypeError(
                f"Unable to handle type {data_type}. {self.__class__.__name__} "
                f"can only read artifacts to the following types: "
                f"{self.ASSOCIATED_TYPES}."
            )

        # If `handle_input` is overridden, call it here to not break custom
        # materializers.
        if type(self).handle_input != BaseMaterializer.handle_input:
            logger.warning(
                "The `materializer.handle_input` method is deprecated and will "
                "be removed in a future release. Please adjust your "
                f"'{type(self).__name__}' materializer to override and use "
                "`materializer.load` instead."
            )
            return self.handle_input(data_type)

        return None

    def save(self, data: Any) -> None:
        """Write logic here to save the data of an artifact.

        Args:
            data: The data of the artifact to save.

        Raises:
            TypeError: If the artifact data is not of the correct type.
        """
        data_type = type(data)
        if not self._can_handle_type(data_type):
            raise TypeError(
                f"Unable to write {data_type}. {self.__class__.__name__} "
                f"can only write the following types: {self.ASSOCIATED_TYPES}."
            )

        # If `handle_return` is overridden, call it here to not break custom
        # materializers.
        if type(self).handle_return != BaseMaterializer.handle_return:
            logger.warning(
                "The `materializer.handle_return` method is deprecated and will "
                "be removed in a future release. Please adjust your "
                f"'{type(self).__name__}' materializer to override and use "
                "`materializer.save` instead."
            )
            self.handle_return(data)

    def extract_metadata(self, data: Any) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given data.

        This metadata will be tracked and displayed alongside the artifact.

        Args:
            data: The data to extract metadata from.

        Returns:
            A dictionary of metadata.

        Raises:
            TypeError: If the data is not of the correct type.
        """
        from zenml.metadata.metadata_types import StorageSize

        data_type = type(data)
        if not self._can_handle_type(data_type):
            raise TypeError(
                f"Unable to extract metadata from {data_type}. "
                f"{self.__class__.__name__} can only write the following "
                f"types: {self.ASSOCIATED_TYPES}."
            )

        storage_size = fileio.size(self.uri)
        if storage_size:
            return {"storage_size": StorageSize(storage_size)}
        return {}

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Deprecated method to load the data of an artifact.

        Args:
            data_type: What type the artifact data should be loaded as.

        Returns:
            The data of the artifact.
        """
        logger.warning(
            "The `materializer.handle_input` method is deprecated and will "
            "be removed in a future release. Please use `materializer.load` "
            "instead."
        )

        # If `handle_input` is called on a new materializer that does not
        # override it, call `load` instead.
        if type(self).handle_input == BaseMaterializer.handle_input:
            return self.load(data_type)

    def handle_return(self, data: Any) -> None:
        """Deprecated method to save the data of an artifact.

        Args:
            data: The data of the artifact to save.
        """
        logger.warning(
            "The `materializer.handle_return` method is deprecated and will "
            "be removed in a future release. Please use `materializer.save` "
            "instead."
        )

        # If `handle_return` is called on a new materializer that does not
        # override it, call `save` instead.
        if type(self).handle_return == BaseMaterializer.handle_return:
            self.save(data)
__init__(self, uri=None, artifact=None) special

Initializes a materializer with the given URI.

Parameters:

Name Type Description Default
uri Optional[str]

The URI where the artifact data is stored.

None
artifact Optional[BaseArtifact]

Deprecated; will be removed in a future release.

None

Exceptions:

Type Description
ValueError

If neither a URI nor an artifact is provided.

Source code in zenml/materializers/base_materializer.py
def __init__(
    self,
    uri: Optional[str] = None,
    artifact: Optional["BaseArtifact"] = None,
):
    """Initializes a materializer with the given URI.

    Args:
        uri: The URI where the artifact data is stored.
        artifact: Deprecated; will be removed in a future release.

    Raises:
        ValueError: If neither a URI nor an artifact is provided.
    """
    if isinstance(uri, BaseArtifact):
        artifact = uri
    if artifact is not None:
        logger.warning(
            "Initializing a materializer with an artifact is deprecated "
            "and will be removed in a future release. Please initialize "
            "the materializer with the artifact's URI instead via "
            "`materializer = MyMaterializer(uri=artifact.uri)`."
        )
        self.uri = artifact.uri
    elif uri is not None:
        self.uri = uri
    else:
        raise ValueError(
            "Initializing a materializer requires either a URI or an "
            "artifact."
        )
    self.artifact = DeprecatedArtifact(self.uri)
extract_metadata(self, data)

Extract metadata from the given data.

This metadata will be tracked and displayed alongside the artifact.

Parameters:

Name Type Description Default
data Any

The data to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

A dictionary of metadata.

Exceptions:

Type Description
TypeError

If the data is not of the correct type.

Source code in zenml/materializers/base_materializer.py
def extract_metadata(self, data: Any) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given data.

    This metadata will be tracked and displayed alongside the artifact.

    Args:
        data: The data to extract metadata from.

    Returns:
        A dictionary of metadata.

    Raises:
        TypeError: If the data is not of the correct type.
    """
    from zenml.metadata.metadata_types import StorageSize

    data_type = type(data)
    if not self._can_handle_type(data_type):
        raise TypeError(
            f"Unable to extract metadata from {data_type}. "
            f"{self.__class__.__name__} can only write the following "
            f"types: {self.ASSOCIATED_TYPES}."
        )

    storage_size = fileio.size(self.uri)
    if storage_size:
        return {"storage_size": StorageSize(storage_size)}
    return {}
handle_input(self, data_type)

Deprecated method to load the data of an artifact.

Parameters:

Name Type Description Default
data_type Type[Any]

What type the artifact data should be loaded as.

required

Returns:

Type Description
Any

The data of the artifact.

Source code in zenml/materializers/base_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Deprecated method to load the data of an artifact.

    Args:
        data_type: What type the artifact data should be loaded as.

    Returns:
        The data of the artifact.
    """
    logger.warning(
        "The `materializer.handle_input` method is deprecated and will "
        "be removed in a future release. Please use `materializer.load` "
        "instead."
    )

    # If `handle_input` is called on a new materializer that does not
    # override it, call `load` instead.
    if type(self).handle_input == BaseMaterializer.handle_input:
        return self.load(data_type)
handle_return(self, data)

Deprecated method to save the data of an artifact.

Parameters:

Name Type Description Default
data Any

The data of the artifact to save.

required
Source code in zenml/materializers/base_materializer.py
def handle_return(self, data: Any) -> None:
    """Deprecated method to save the data of an artifact.

    Args:
        data: The data of the artifact to save.
    """
    logger.warning(
        "The `materializer.handle_return` method is deprecated and will "
        "be removed in a future release. Please use `materializer.save` "
        "instead."
    )

    # If `handle_return` is called on a new materializer that does not
    # override it, call `save` instead.
    if type(self).handle_return == BaseMaterializer.handle_return:
        self.save(data)
load(self, data_type)

Write logic here to load the data of an artifact.

Parameters:

Name Type Description Default
data_type Type[Any]

What type the artifact data should be loaded as.

required

Returns:

Type Description
Any

The data of the artifact.

Exceptions:

Type Description
TypeError

If the artifact data is not of the correct type.

Source code in zenml/materializers/base_materializer.py
def load(self, data_type: Type[Any]) -> Any:
    """Write logic here to load the data of an artifact.

    Args:
        data_type: What type the artifact data should be loaded as.

    Returns:
        The data of the artifact.

    Raises:
        TypeError: If the artifact data is not of the correct type.
    """
    if not self._can_handle_type(data_type):
        raise TypeError(
            f"Unable to handle type {data_type}. {self.__class__.__name__} "
            f"can only read artifacts to the following types: "
            f"{self.ASSOCIATED_TYPES}."
        )

    # If `handle_input` is overridden, call it here to not break custom
    # materializers.
    if type(self).handle_input != BaseMaterializer.handle_input:
        logger.warning(
            "The `materializer.handle_input` method is deprecated and will "
            "be removed in a future release. Please adjust your "
            f"'{type(self).__name__}' materializer to override and use "
            "`materializer.load` instead."
        )
        return self.handle_input(data_type)

    return None
save(self, data)

Write logic here to save the data of an artifact.

Parameters:

Name Type Description Default
data Any

The data of the artifact to save.

required

Exceptions:

Type Description
TypeError

If the artifact data is not of the correct type.

Source code in zenml/materializers/base_materializer.py
def save(self, data: Any) -> None:
    """Write logic here to save the data of an artifact.

    Args:
        data: The data of the artifact to save.

    Raises:
        TypeError: If the artifact data is not of the correct type.
    """
    data_type = type(data)
    if not self._can_handle_type(data_type):
        raise TypeError(
            f"Unable to write {data_type}. {self.__class__.__name__} "
            f"can only write the following types: {self.ASSOCIATED_TYPES}."
        )

    # If `handle_return` is overridden, call it here to not break custom
    # materializers.
    if type(self).handle_return != BaseMaterializer.handle_return:
        logger.warning(
            "The `materializer.handle_return` method is deprecated and will "
            "be removed in a future release. Please adjust your "
            f"'{type(self).__name__}' materializer to override and use "
            "`materializer.save` instead."
        )
        self.handle_return(data)

BaseMaterializerMeta (type)

Metaclass responsible for registering different BaseMaterializer subclasses.

Materializers are used for reading/writing artifacts.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializerMeta(type):
    """Metaclass responsible for registering different BaseMaterializer subclasses.

    Materializers are used for reading/writing artifacts.
    """

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BaseMaterializerMeta":
        """Creates a Materializer class and registers it at the `MaterializerRegistry`.

        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The dictionary of the class.

        Returns:
            The BaseMaterializerMeta class.

        Raises:
            MaterializerInterfaceError: If the class was improperly defined.
        """
        cls = cast(
            Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
        )

        # Skip the following validation and registration for the base class.
        if name == "BaseMaterializer":
            return cls

        # Validate that the class is properly defined.
        if not cls.ASSOCIATED_TYPES:
            raise MaterializerInterfaceError(
                f"Invalid materializer class '{name}'. When creating a "
                f"custom materializer, make sure to specify at least one "
                f"type in its ASSOCIATED_TYPES class variable.",
                url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
            )

        # Check if the deprecated ASSOCIATED_ARTIFACT_TYPES is used.
        artifact_type: str = cls.ASSOCIATED_ARTIFACT_TYPE
        if cls.ASSOCIATED_ARTIFACT_TYPES:
            logger.warning(
                "The `materializer.ASSOCIATED_ARTIFACT_TYPES` class variable "
                "is deprecated and will be removed in a future release. Please "
                f"adjust your '{name}' materializer to use "
                "`ASSOCIATED_ARTIFACT_TYPE` instead."
            )
            artifact_class = cls.ASSOCIATED_ARTIFACT_TYPES[0]
            if not (
                inspect.isclass(artifact_class)
                and issubclass(artifact_class, BaseArtifact)
            ):
                raise MaterializerInterfaceError(
                    f"ASSOCIATED_ARTIFACT_TYPES value '{artifact_class}' for "
                    f"materializer '{name}' is not a `BaseArtifact` "
                    f"subclass.",
                    url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
                )
            artifact_type = artifact_class.TYPE_NAME

        # Validate associated artifact type.
        if artifact_type:
            try:
                cls.ASSOCIATED_ARTIFACT_TYPE = ArtifactType(artifact_type)
            except ValueError:
                raise MaterializerInterfaceError(
                    f"Invalid materializer class '{name}'. When creating a "
                    f"custom materializer, make sure to specify a valid "
                    f"artifact type in its ASSOCIATED_ARTIFACT_TYPE class "
                    f"variable.",
                    url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
                )

        # Validate associated data types.
        for associated_type in cls.ASSOCIATED_TYPES:
            if not inspect.isclass(associated_type):
                raise MaterializerInterfaceError(
                    f"Associated type {associated_type} for materializer "
                    f"{name} is not a class.",
                    url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
                )

        # Register the materializer.
        for associated_type in cls.ASSOCIATED_TYPES:
            default_materializer_registry.register_materializer_type(
                associated_type, cls
            )

        return cls
__new__(mcs, name, bases, dct) special staticmethod

Creates a Materializer class and registers it at the MaterializerRegistry.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases Tuple[Type[Any], ...]

The base classes of the class.

required
dct Dict[str, Any]

The dictionary of the class.

required

Returns:

Type Description
BaseMaterializerMeta

The BaseMaterializerMeta class.

Exceptions:

Type Description
MaterializerInterfaceError

If the class was improperly defined.

Source code in zenml/materializers/base_materializer.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
    """Creates a Materializer class and registers it at the `MaterializerRegistry`.

    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The dictionary of the class.

    Returns:
        The BaseMaterializerMeta class.

    Raises:
        MaterializerInterfaceError: If the class was improperly defined.
    """
    cls = cast(
        Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
    )

    # Skip the following validation and registration for the base class.
    if name == "BaseMaterializer":
        return cls

    # Validate that the class is properly defined.
    if not cls.ASSOCIATED_TYPES:
        raise MaterializerInterfaceError(
            f"Invalid materializer class '{name}'. When creating a "
            f"custom materializer, make sure to specify at least one "
            f"type in its ASSOCIATED_TYPES class variable.",
            url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
        )

    # Check if the deprecated ASSOCIATED_ARTIFACT_TYPES is used.
    artifact_type: str = cls.ASSOCIATED_ARTIFACT_TYPE
    if cls.ASSOCIATED_ARTIFACT_TYPES:
        logger.warning(
            "The `materializer.ASSOCIATED_ARTIFACT_TYPES` class variable "
            "is deprecated and will be removed in a future release. Please "
            f"adjust your '{name}' materializer to use "
            "`ASSOCIATED_ARTIFACT_TYPE` instead."
        )
        artifact_class = cls.ASSOCIATED_ARTIFACT_TYPES[0]
        if not (
            inspect.isclass(artifact_class)
            and issubclass(artifact_class, BaseArtifact)
        ):
            raise MaterializerInterfaceError(
                f"ASSOCIATED_ARTIFACT_TYPES value '{artifact_class}' for "
                f"materializer '{name}' is not a `BaseArtifact` "
                f"subclass.",
                url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
            )
        artifact_type = artifact_class.TYPE_NAME

    # Validate associated artifact type.
    if artifact_type:
        try:
            cls.ASSOCIATED_ARTIFACT_TYPE = ArtifactType(artifact_type)
        except ValueError:
            raise MaterializerInterfaceError(
                f"Invalid materializer class '{name}'. When creating a "
                f"custom materializer, make sure to specify a valid "
                f"artifact type in its ASSOCIATED_ARTIFACT_TYPE class "
                f"variable.",
                url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
            )

    # Validate associated data types.
    for associated_type in cls.ASSOCIATED_TYPES:
        if not inspect.isclass(associated_type):
            raise MaterializerInterfaceError(
                f"Associated type {associated_type} for materializer "
                f"{name} is not a class.",
                url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
            )

    # Register the materializer.
    for associated_type in cls.ASSOCIATED_TYPES:
        default_materializer_registry.register_materializer_type(
            associated_type, cls
        )

    return cls

DeprecatedArtifact

Mock artifact class to support deprecated materializer.artifact.uri.

Source code in zenml/materializers/base_materializer.py
class DeprecatedArtifact:
    """Mock artifact class to support deprecated `materializer.artifact.uri`."""

    def __init__(self, uri: str) -> None:
        """Initializes the artifact.

        Args:
            uri: The URI of the artifact.
        """
        self._uri = uri

    @property
    def uri(self) -> str:
        """Returns the URI of the artifact.

        Returns:
            The URI of the artifact.
        """
        logger.warning(
            "Calling `materializer.artifact.uri` is deprecated and will be "
            "removed in a future release. Please use `materializer.uri` "
            "instead."
        )
        return self._uri
uri: str property readonly

Returns the URI of the artifact.

Returns:

Type Description
str

The URI of the artifact.

__init__(self, uri) special

Initializes the artifact.

Parameters:

Name Type Description Default
uri str

The URI of the artifact.

required
Source code in zenml/materializers/base_materializer.py
def __init__(self, uri: str) -> None:
    """Initializes the artifact.

    Args:
        uri: The URI of the artifact.
    """
    self._uri = uri

built_in_materializer

Implementation of ZenML's builtin materializer.

BuiltInContainerMaterializer (BaseMaterializer)

Handle built-in container types (dict, list, set, tuple).

Source code in zenml/materializers/built_in_materializer.py
class BuiltInContainerMaterializer(BaseMaterializer):
    """Handle built-in container types (dict, list, set, tuple)."""

    ASSOCIATED_TYPES = (dict, list, set, tuple)

    def __init__(self, uri: str):
        """Define `self.data_path` and `self.metadata_path`.

        Args:
            uri: The URI where the artifact data is stored.
        """
        super().__init__(uri)
        self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)
        self.metadata_path = os.path.join(self.uri, DEFAULT_METADATA_FILENAME)

    def load(self, data_type: Type[Any]) -> Any:
        """Reads a materialized built-in container object.

        If the data was serialized to JSON, deserialize it.

        Otherwise, reconstruct all elements according to the metadata file:
            1. Resolve the data type using `find_type_by_str()`,
            2. Get the materializer via the `default_materializer_registry`,
            3. Initialize the materializer with the desired path,
            4. Use `load()` of that materializer to load the element.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.

        Raises:
            RuntimeError: If the data was not found.
        """
        super().load(data_type)

        # If the data was not serialized, there must be metadata present.
        if not fileio.exists(self.data_path) and not fileio.exists(
            self.metadata_path
        ):
            raise RuntimeError(
                f"Materialization of type {data_type} failed. Expected either"
                f"{self.data_path} or {self.metadata_path} to exist."
            )

        # If the data was serialized as JSON, deserialize it.
        if fileio.exists(self.data_path):
            outputs = yaml_utils.read_json(self.data_path)

        # Otherwise, use the metadata to reconstruct the data as a list.
        else:
            metadata = yaml_utils.read_json(self.metadata_path)
            outputs = []
            for path_, type_str in zip(metadata["paths"], metadata["types"]):
                type_ = find_type_by_str(type_str)
                materializer_class = default_materializer_registry[type_]
                materializer = materializer_class(uri=path_)
                element = materializer.load(type_)
                outputs.append(element)

        # Cast the data to the correct type.
        if issubclass(data_type, dict) and not isinstance(outputs, dict):
            keys, values = outputs
            return dict(zip(keys, values))
        if issubclass(data_type, tuple):
            return tuple(outputs)
        if issubclass(data_type, set):
            return set(outputs)
        return outputs

    def save(self, data: Any) -> None:
        """Materialize a built-in container object.

        If the object can be serialized to JSON, serialize it.

        Otherwise, use the `default_materializer_registry` to find the correct
        materializer for each element and materialize each element into a
        subdirectory.

        Tuples and sets are cast to list before materialization.

        For non-serializable dicts, materialize keys/values as separate lists.

        Args:
            data: The built-in container object to materialize.

        Raises:
            Exception: If any exception occurs, it is raised after cleanup.
        """
        super().save(data)

        # tuple and set: handle as list.
        if isinstance(data, tuple) or isinstance(data, set):
            data = list(data)

        # If the data is serializable, just write it into a single JSON file.
        if _is_serializable(data):
            yaml_utils.write_json(self.data_path, data)
            return

        # non-serializable dict: Handle as non-serializable list of lists.
        if isinstance(data, dict):
            data = [list(data.keys()), list(data.values())]

        # non-serializable list: Materialize each element into a subfolder.
        # Get path, type, and corresponding materializer for each element.
        paths, types, materializers = [], [], []
        for i, element in enumerate(data):
            element_path = os.path.join(self.uri, str(i))
            fileio.mkdir(element_path)
            type_ = find_materializer_registry_type(type(element))
            paths.append(element_path)
            types.append(str(type_))
            materializer_class = default_materializer_registry[type_]
            materializer = materializer_class(uri=element_path)
            materializers.append(materializer)
        try:
            # Write metadata as JSON.
            metadata = {"length": len(data), "paths": paths, "types": types}
            yaml_utils.write_json(self.metadata_path, metadata)
            # Materialize each element.
            for element, materializer in zip(data, materializers):
                materializer.save(element)
        # If an error occurs, delete all created files.
        except Exception as e:
            # Delete metadata
            if fileio.exists(self.metadata_path):
                fileio.remove(self.metadata_path)
            # Delete all elements that were already saved.
            for element_path in paths:
                fileio.rmtree(element_path)
            raise e

    def extract_metadata(self, data: Any) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given built-in container object.

        Args:
            data: The built-in container object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        base_metadata = super().extract_metadata(data)
        container_metadata = {}
        if hasattr(data, "__len__"):
            container_metadata = {
                "length": len(data),
            }
        return {**base_metadata, **container_metadata}
__init__(self, uri) special

Define self.data_path and self.metadata_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
Source code in zenml/materializers/built_in_materializer.py
def __init__(self, uri: str):
    """Define `self.data_path` and `self.metadata_path`.

    Args:
        uri: The URI where the artifact data is stored.
    """
    super().__init__(uri)
    self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)
    self.metadata_path = os.path.join(self.uri, DEFAULT_METADATA_FILENAME)
extract_metadata(self, data)

Extract metadata from the given built-in container object.

Parameters:

Name Type Description Default
data Any

The built-in container object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/materializers/built_in_materializer.py
def extract_metadata(self, data: Any) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given built-in container object.

    Args:
        data: The built-in container object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    base_metadata = super().extract_metadata(data)
    container_metadata = {}
    if hasattr(data, "__len__"):
        container_metadata = {
            "length": len(data),
        }
    return {**base_metadata, **container_metadata}
load(self, data_type)

Reads a materialized built-in container object.

If the data was serialized to JSON, deserialize it.

Otherwise, reconstruct all elements according to the metadata file: 1. Resolve the data type using find_type_by_str(), 2. Get the materializer via the default_materializer_registry, 3. Initialize the materializer with the desired path, 4. Use load() of that materializer to load the element.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Exceptions:

Type Description
RuntimeError

If the data was not found.

Source code in zenml/materializers/built_in_materializer.py
def load(self, data_type: Type[Any]) -> Any:
    """Reads a materialized built-in container object.

    If the data was serialized to JSON, deserialize it.

    Otherwise, reconstruct all elements according to the metadata file:
        1. Resolve the data type using `find_type_by_str()`,
        2. Get the materializer via the `default_materializer_registry`,
        3. Initialize the materializer with the desired path,
        4. Use `load()` of that materializer to load the element.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.

    Raises:
        RuntimeError: If the data was not found.
    """
    super().load(data_type)

    # If the data was not serialized, there must be metadata present.
    if not fileio.exists(self.data_path) and not fileio.exists(
        self.metadata_path
    ):
        raise RuntimeError(
            f"Materialization of type {data_type} failed. Expected either"
            f"{self.data_path} or {self.metadata_path} to exist."
        )

    # If the data was serialized as JSON, deserialize it.
    if fileio.exists(self.data_path):
        outputs = yaml_utils.read_json(self.data_path)

    # Otherwise, use the metadata to reconstruct the data as a list.
    else:
        metadata = yaml_utils.read_json(self.metadata_path)
        outputs = []
        for path_, type_str in zip(metadata["paths"], metadata["types"]):
            type_ = find_type_by_str(type_str)
            materializer_class = default_materializer_registry[type_]
            materializer = materializer_class(uri=path_)
            element = materializer.load(type_)
            outputs.append(element)

    # Cast the data to the correct type.
    if issubclass(data_type, dict) and not isinstance(outputs, dict):
        keys, values = outputs
        return dict(zip(keys, values))
    if issubclass(data_type, tuple):
        return tuple(outputs)
    if issubclass(data_type, set):
        return set(outputs)
    return outputs
save(self, data)

Materialize a built-in container object.

If the object can be serialized to JSON, serialize it.

Otherwise, use the default_materializer_registry to find the correct materializer for each element and materialize each element into a subdirectory.

Tuples and sets are cast to list before materialization.

For non-serializable dicts, materialize keys/values as separate lists.

Parameters:

Name Type Description Default
data Any

The built-in container object to materialize.

required

Exceptions:

Type Description
Exception

If any exception occurs, it is raised after cleanup.

Source code in zenml/materializers/built_in_materializer.py
def save(self, data: Any) -> None:
    """Materialize a built-in container object.

    If the object can be serialized to JSON, serialize it.

    Otherwise, use the `default_materializer_registry` to find the correct
    materializer for each element and materialize each element into a
    subdirectory.

    Tuples and sets are cast to list before materialization.

    For non-serializable dicts, materialize keys/values as separate lists.

    Args:
        data: The built-in container object to materialize.

    Raises:
        Exception: If any exception occurs, it is raised after cleanup.
    """
    super().save(data)

    # tuple and set: handle as list.
    if isinstance(data, tuple) or isinstance(data, set):
        data = list(data)

    # If the data is serializable, just write it into a single JSON file.
    if _is_serializable(data):
        yaml_utils.write_json(self.data_path, data)
        return

    # non-serializable dict: Handle as non-serializable list of lists.
    if isinstance(data, dict):
        data = [list(data.keys()), list(data.values())]

    # non-serializable list: Materialize each element into a subfolder.
    # Get path, type, and corresponding materializer for each element.
    paths, types, materializers = [], [], []
    for i, element in enumerate(data):
        element_path = os.path.join(self.uri, str(i))
        fileio.mkdir(element_path)
        type_ = find_materializer_registry_type(type(element))
        paths.append(element_path)
        types.append(str(type_))
        materializer_class = default_materializer_registry[type_]
        materializer = materializer_class(uri=element_path)
        materializers.append(materializer)
    try:
        # Write metadata as JSON.
        metadata = {"length": len(data), "paths": paths, "types": types}
        yaml_utils.write_json(self.metadata_path, metadata)
        # Materialize each element.
        for element, materializer in zip(data, materializers):
            materializer.save(element)
    # If an error occurs, delete all created files.
    except Exception as e:
        # Delete metadata
        if fileio.exists(self.metadata_path):
            fileio.remove(self.metadata_path)
        # Delete all elements that were already saved.
        for element_path in paths:
            fileio.rmtree(element_path)
        raise e

BuiltInMaterializer (BaseMaterializer)

Handle JSON-serializable basic types (bool, float, int, str).

Source code in zenml/materializers/built_in_materializer.py
class BuiltInMaterializer(BaseMaterializer):
    """Handle JSON-serializable basic types (`bool`, `float`, `int`, `str`)."""

    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
    ASSOCIATED_TYPES = BASIC_TYPES

    def __init__(self, uri: str):
        """Define `self.data_path`.

        Args:
            uri: The URI where the artifact data is stored.
        """
        super().__init__(uri)
        self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)

    def load(
        self, data_type: Union[Type[bool], Type[float], Type[int], Type[str]]
    ) -> Any:
        """Reads basic primitive types from JSON.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        super().load(data_type)
        contents = yaml_utils.read_json(self.data_path)
        if type(contents) != data_type:
            # TODO [ENG-142]: Raise error or try to coerce
            logger.debug(
                f"Contents {contents} was type {type(contents)} but expected "
                f"{data_type}"
            )
        return contents

    def save(self, data: Union[bool, float, int, str]) -> None:
        """Serialize a basic type to JSON.

        Args:
            data: The data to store.
        """
        super().save(data)
        yaml_utils.write_json(self.data_path, data)

    def extract_metadata(
        self, data: Union[bool, float, int, str]
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given built-in container object.

        Args:
            data: The built-in container object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        base_metadata = super().extract_metadata(data)
        builtin_metadata: Dict[str, "MetadataType"] = {}

        # For boolean and numbers, add the string representation as metadata.
        # We don't to this for strings because they can be arbitrarily long.
        if isinstance(data, (bool, float, int)):
            builtin_metadata["string_representation"] = str(data)

        return {**base_metadata, **builtin_metadata}
__init__(self, uri) special

Define self.data_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
Source code in zenml/materializers/built_in_materializer.py
def __init__(self, uri: str):
    """Define `self.data_path`.

    Args:
        uri: The URI where the artifact data is stored.
    """
    super().__init__(uri)
    self.data_path = os.path.join(self.uri, DEFAULT_FILENAME)
extract_metadata(self, data)

Extract metadata from the given built-in container object.

Parameters:

Name Type Description Default
data Union[bool, float, int, str]

The built-in container object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/materializers/built_in_materializer.py
def extract_metadata(
    self, data: Union[bool, float, int, str]
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given built-in container object.

    Args:
        data: The built-in container object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    base_metadata = super().extract_metadata(data)
    builtin_metadata: Dict[str, "MetadataType"] = {}

    # For boolean and numbers, add the string representation as metadata.
    # We don't to this for strings because they can be arbitrarily long.
    if isinstance(data, (bool, float, int)):
        builtin_metadata["string_representation"] = str(data)

    return {**base_metadata, **builtin_metadata}
load(self, data_type)

Reads basic primitive types from JSON.

Parameters:

Name Type Description Default
data_type Union[Type[bool], Type[float], Type[int], Type[str]]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in zenml/materializers/built_in_materializer.py
def load(
    self, data_type: Union[Type[bool], Type[float], Type[int], Type[str]]
) -> Any:
    """Reads basic primitive types from JSON.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    super().load(data_type)
    contents = yaml_utils.read_json(self.data_path)
    if type(contents) != data_type:
        # TODO [ENG-142]: Raise error or try to coerce
        logger.debug(
            f"Contents {contents} was type {type(contents)} but expected "
            f"{data_type}"
        )
    return contents
save(self, data)

Serialize a basic type to JSON.

Parameters:

Name Type Description Default
data Union[bool, float, int, str]

The data to store.

required
Source code in zenml/materializers/built_in_materializer.py
def save(self, data: Union[bool, float, int, str]) -> None:
    """Serialize a basic type to JSON.

    Args:
        data: The data to store.
    """
    super().save(data)
    yaml_utils.write_json(self.data_path, data)

BytesMaterializer (BaseMaterializer)

Handle bytes data type, which is not JSON serializable.

Source code in zenml/materializers/built_in_materializer.py
class BytesMaterializer(BaseMaterializer):
    """Handle `bytes` data type, which is not JSON serializable."""

    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
    ASSOCIATED_TYPES = (bytes,)

    def __init__(self, uri: str):
        """Define `self.data_path`.

        Args:
            uri: The URI where the artifact data is stored.
        """
        super().__init__(uri)
        self.data_path = os.path.join(self.uri, DEFAULT_BYTES_FILENAME)

    def load(self, data_type: Type[Any]) -> Any:
        """Reads a bytes object from file.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        super().load(data_type)
        with fileio.open(self.data_path, "rb") as file_:
            return file_.read()

    def save(self, data: Any) -> None:
        """Save a bytes object to file.

        Args:
            data: The data to store.
        """
        super().save(data)
        with fileio.open(self.data_path, "wb") as file_:
            file_.write(data)
__init__(self, uri) special

Define self.data_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
Source code in zenml/materializers/built_in_materializer.py
def __init__(self, uri: str):
    """Define `self.data_path`.

    Args:
        uri: The URI where the artifact data is stored.
    """
    super().__init__(uri)
    self.data_path = os.path.join(self.uri, DEFAULT_BYTES_FILENAME)
load(self, data_type)

Reads a bytes object from file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in zenml/materializers/built_in_materializer.py
def load(self, data_type: Type[Any]) -> Any:
    """Reads a bytes object from file.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    super().load(data_type)
    with fileio.open(self.data_path, "rb") as file_:
        return file_.read()
save(self, data)

Save a bytes object to file.

Parameters:

Name Type Description Default
data Any

The data to store.

required
Source code in zenml/materializers/built_in_materializer.py
def save(self, data: Any) -> None:
    """Save a bytes object to file.

    Args:
        data: The data to store.
    """
    super().save(data)
    with fileio.open(self.data_path, "wb") as file_:
        file_.write(data)

find_materializer_registry_type(type_)

For a given type, find the type registered in the registry.

This can be either the type itself, or a superclass of the type.

Parameters:

Name Type Description Default
type_ Type[Any]

The type to find.

required

Returns:

Type Description
Type[Any]

The type registered in the registry.

Exceptions:

Type Description
RuntimeError

If the type could not be resolved.

Source code in zenml/materializers/built_in_materializer.py
def find_materializer_registry_type(type_: Type[Any]) -> Type[Any]:
    """For a given type, find the type registered in the registry.

    This can be either the type itself, or a superclass of the type.

    Args:
        type_: The type to find.

    Returns:
        The type registered in the registry.

    Raises:
        RuntimeError: If the type could not be resolved.
    """
    # Check that a unique materializer is registered for this type
    default_materializer_registry[type_]

    # Check if the type itself is registered
    registered_types = default_materializer_registry.materializer_types.keys()
    if type_ in registered_types:
        return type_

    # Check if a superclass of the type is registered
    for registered_type in registered_types:
        if issubclass(type_, registered_type):
            return registered_type

    # Raise an error otherwise - this should never happen since
    # `default_materializer_registry[type_]` should have raised an error already
    raise RuntimeError(
        f"Cannot find a materializer for type '{type_}' in the "
        f"materializer registry."
    )

find_type_by_str(type_str)

Get a Python type, given its string representation.

E.g., "" should resolve to int.

Currently this is implemented by checking all artifact types registered in the default_materializer_registry. This means, only types in the registry can be found. Any other types will cause a RunTimeError.

Parameters:

Name Type Description Default
type_str str

The string representation of a type.

required

Exceptions:

Type Description
RuntimeError

If the type could not be resolved.

Returns:

Type Description
Type[Any]

The type whose string representation is type_str.

Source code in zenml/materializers/built_in_materializer.py
def find_type_by_str(type_str: str) -> Type[Any]:
    """Get a Python type, given its string representation.

    E.g., "<class 'int'>" should resolve to `int`.

    Currently this is implemented by checking all artifact types registered in
    the `default_materializer_registry`. This means, only types in the registry
    can be found. Any other types will cause a `RunTimeError`.

    Args:
        type_str: The string representation of a type.

    Raises:
        RuntimeError: If the type could not be resolved.

    Returns:
        The type whose string representation is `type_str`.
    """
    registered_types = default_materializer_registry.materializer_types.keys()
    type_str_mapping = {str(type_): type_ for type_ in registered_types}
    if type_str in type_str_mapping:
        return type_str_mapping[type_str]
    raise RuntimeError(f"Cannot resolve type '{type_str}'.")

default_materializer_registry

Implementation of a default materializer registry.

MaterializerRegistry

Matches a Python type to a default materializer.

Source code in zenml/materializers/default_materializer_registry.py
class MaterializerRegistry:
    """Matches a Python type to a default materializer."""

    def __init__(self) -> None:
        """Initialize the materializer registry."""
        self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}

    def register_materializer_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer.

        Args:
            key: Indicates the type of object.
            type_: A BaseMaterializer subclass.
        """
        if key not in self.materializer_types:
            self.materializer_types[key] = type_
            logger.debug(f"Registered materializer {type_} for {key}")
        else:
            logger.debug(
                f"Found existing materializer class for {key}: "
                f"{self.materializer_types[key]}. Skipping registration of "
                f"{type_}."
            )

    def register_and_overwrite_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer and also overwrites a default if set.

        Args:
            key: Indicates the type of object.
            type_: A BaseMaterializer subclass.
        """
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")

    def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
        """Get a single materializers based on the key.

        Args:
            key: Indicates the type of object.

        Returns:
            `BaseMaterializer` subclass that was registered for this key.

        Raises:
            StepInterfaceError: If the key (or any of its superclasses) is not
                registered.
        """
        for class_ in key.__mro__:
            materializer = self.materializer_types.get(class_, None)
            if materializer:
                return materializer

        raise StepInterfaceError(
            f"No materializer registered for class {key}. You can register a "
            f"default materializer for specific types by subclassing "
            f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
            f" variable.",
            url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
        )

    def get_materializer_types(
        self,
    ) -> Dict[Type[Any], Type["BaseMaterializer"]]:
        """Get all registered materializer types.

        Returns:
            A dictionary of registered materializer types.
        """
        return self.materializer_types

    def is_registered(self, key: Type[Any]) -> bool:
        """Returns if a materializer class is registered for the given type.

        Args:
            key: Indicates the type of object.

        Returns:
            True if a materializer is registered for the given type, False
            otherwise.
        """
        return any(issubclass(key, type_) for type_ in self.materializer_types)
__getitem__(self, key) special

Get a single materializers based on the key.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required

Returns:

Type Description
Type[BaseMaterializer]

BaseMaterializer subclass that was registered for this key.

Exceptions:

Type Description
StepInterfaceError

If the key (or any of its superclasses) is not registered.

Source code in zenml/materializers/default_materializer_registry.py
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
    """Get a single materializers based on the key.

    Args:
        key: Indicates the type of object.

    Returns:
        `BaseMaterializer` subclass that was registered for this key.

    Raises:
        StepInterfaceError: If the key (or any of its superclasses) is not
            registered.
    """
    for class_ in key.__mro__:
        materializer = self.materializer_types.get(class_, None)
        if materializer:
            return materializer

    raise StepInterfaceError(
        f"No materializer registered for class {key}. You can register a "
        f"default materializer for specific types by subclassing "
        f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
        f" variable.",
        url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
    )
__init__(self) special

Initialize the materializer registry.

Source code in zenml/materializers/default_materializer_registry.py
def __init__(self) -> None:
    """Initialize the materializer registry."""
    self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}
get_materializer_types(self)

Get all registered materializer types.

Returns:

Type Description
Dict[Type[Any], Type[BaseMaterializer]]

A dictionary of registered materializer types.

Source code in zenml/materializers/default_materializer_registry.py
def get_materializer_types(
    self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
    """Get all registered materializer types.

    Returns:
        A dictionary of registered materializer types.
    """
    return self.materializer_types
is_registered(self, key)

Returns if a materializer class is registered for the given type.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required

Returns:

Type Description
bool

True if a materializer is registered for the given type, False otherwise.

Source code in zenml/materializers/default_materializer_registry.py
def is_registered(self, key: Type[Any]) -> bool:
    """Returns if a materializer class is registered for the given type.

    Args:
        key: Indicates the type of object.

    Returns:
        True if a materializer is registered for the given type, False
        otherwise.
    """
    return any(issubclass(key, type_) for type_ in self.materializer_types)
register_and_overwrite_type(self, key, type_)

Registers a new materializer and also overwrites a default if set.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_and_overwrite_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer and also overwrites a default if set.

    Args:
        key: Indicates the type of object.
        type_: A BaseMaterializer subclass.
    """
    self.materializer_types[key] = type_
    logger.debug(f"Registered materializer {type_} for {key}")
register_materializer_type(self, key, type_)

Registers a new materializer.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_materializer_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer.

    Args:
        key: Indicates the type of object.
        type_: A BaseMaterializer subclass.
    """
    if key not in self.materializer_types:
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")
    else:
        logger.debug(
            f"Found existing materializer class for {key}: "
            f"{self.materializer_types[key]}. Skipping registration of "
            f"{type_}."
        )

numpy_materializer

Implementation of the ZenML NumPy materializer.

NumpyMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/numpy_materializer.py
class NumpyMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (np.ndarray,)
    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA

    def load(self, data_type: Type[Any]) -> "Any":
        """Reads a numpy array from a `.npy` file.

        Args:
            data_type: The type of the data to read.


        Raises:
            ImportError: If pyarrow is not installed.

        Returns:
            The numpy array.
        """
        super().load(data_type)

        numpy_file = os.path.join(self.uri, NUMPY_FILENAME)

        if fileio.exists(numpy_file):
            with fileio.open(numpy_file, "rb") as f:
                # This function is untyped for numpy versions supporting python
                # 3.7, but typed for numpy versions installed on python 3.8+.
                # We need to cast it to any here so that numpy doesn't complain
                # about either an untyped function call or an unused ignore
                # statement
                return cast(Any, np.load)(f, allow_pickle=True)
        elif fileio.exists(os.path.join(self.uri, DATA_FILENAME)):
            logger.warning(
                "A legacy artifact was found. "
                "This artifact was created with an older version of "
                "ZenML. You can still use it, but it will be "
                "converted to the new format on the next materialization."
            )
            try:
                # Import old materializer dependencies
                import pyarrow as pa  # type: ignore
                import pyarrow.parquet as pq  # type: ignore

                from zenml.utils import yaml_utils

                # Read numpy array from parquet file
                shape_dict = yaml_utils.read_json(
                    os.path.join(self.uri, SHAPE_FILENAME)
                )
                shape_tuple = tuple(shape_dict.values())
                with fileio.open(
                    os.path.join(self.uri, DATA_FILENAME), "rb"
                ) as f:
                    input_stream = pa.input_stream(f)
                    data = pq.read_table(input_stream)
                vals = getattr(data.to_pandas(), DATA_VAR).values
                return np.reshape(vals, shape_tuple)
            except ImportError:
                raise ImportError(
                    "You have an old version of a `NumpyMaterializer` ",
                    "data artifact stored in the artifact store ",
                    "as a `.parquet` file, which requires `pyarrow` for reading. ",
                    "You can install `pyarrow` by running `pip install pyarrow`.",
                )

    def save(self, arr: "NDArray[Any]") -> None:
        """Writes a np.ndarray to the artifact store as a `.npy` file.

        Args:
            arr: The numpy array to write.
        """
        super().save(arr)
        with fileio.open(os.path.join(self.uri, NUMPY_FILENAME), "wb") as f:
            # This function is untyped for numpy versions supporting python
            # 3.7, but typed for numpy versions installed on python 3.8+.
            # We need to cast it to any here so that numpy doesn't complain
            # about either an untyped function call or an unused ignore
            # statement
            cast(Any, np.save)(f, arr)

    def extract_metadata(
        self, arr: "NDArray[Any]"
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given numpy array.

        Args:
            arr: The numpy array to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        base_metadata = super().extract_metadata(arr)

        # These functions are untyped for numpy versions supporting python
        # 3.7, but typed for numpy versions installed on python 3.8+.
        # We need to cast them to Any here so that numpy doesn't complain
        # about either an untyped function call or an unused ignore statement.
        min_val = cast(Any, np.min)(arr).item()
        max_val = cast(Any, np.max)(arr).item()

        numpy_metadata: Dict[str, "MetadataType"] = {
            "shape": tuple(arr.shape),
            "dtype": DType(arr.dtype.type),
            "mean": np.mean(arr).item(),
            "std": np.std(arr).item(),
            "min": min_val,
            "max": max_val,
        }
        return {**base_metadata, **numpy_metadata}
extract_metadata(self, arr)

Extract metadata from the given numpy array.

Parameters:

Name Type Description Default
arr NDArray[Any]

The numpy array to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/materializers/numpy_materializer.py
def extract_metadata(
    self, arr: "NDArray[Any]"
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given numpy array.

    Args:
        arr: The numpy array to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    base_metadata = super().extract_metadata(arr)

    # These functions are untyped for numpy versions supporting python
    # 3.7, but typed for numpy versions installed on python 3.8+.
    # We need to cast them to Any here so that numpy doesn't complain
    # about either an untyped function call or an unused ignore statement.
    min_val = cast(Any, np.min)(arr).item()
    max_val = cast(Any, np.max)(arr).item()

    numpy_metadata: Dict[str, "MetadataType"] = {
        "shape": tuple(arr.shape),
        "dtype": DType(arr.dtype.type),
        "mean": np.mean(arr).item(),
        "std": np.std(arr).item(),
        "min": min_val,
        "max": max_val,
    }
    return {**base_metadata, **numpy_metadata}
load(self, data_type)

Reads a numpy array from a .npy file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Exceptions:

Type Description
ImportError

If pyarrow is not installed.

Returns:

Type Description
Any

The numpy array.

Source code in zenml/materializers/numpy_materializer.py
def load(self, data_type: Type[Any]) -> "Any":
    """Reads a numpy array from a `.npy` file.

    Args:
        data_type: The type of the data to read.


    Raises:
        ImportError: If pyarrow is not installed.

    Returns:
        The numpy array.
    """
    super().load(data_type)

    numpy_file = os.path.join(self.uri, NUMPY_FILENAME)

    if fileio.exists(numpy_file):
        with fileio.open(numpy_file, "rb") as f:
            # This function is untyped for numpy versions supporting python
            # 3.7, but typed for numpy versions installed on python 3.8+.
            # We need to cast it to any here so that numpy doesn't complain
            # about either an untyped function call or an unused ignore
            # statement
            return cast(Any, np.load)(f, allow_pickle=True)
    elif fileio.exists(os.path.join(self.uri, DATA_FILENAME)):
        logger.warning(
            "A legacy artifact was found. "
            "This artifact was created with an older version of "
            "ZenML. You can still use it, but it will be "
            "converted to the new format on the next materialization."
        )
        try:
            # Import old materializer dependencies
            import pyarrow as pa  # type: ignore
            import pyarrow.parquet as pq  # type: ignore

            from zenml.utils import yaml_utils

            # Read numpy array from parquet file
            shape_dict = yaml_utils.read_json(
                os.path.join(self.uri, SHAPE_FILENAME)
            )
            shape_tuple = tuple(shape_dict.values())
            with fileio.open(
                os.path.join(self.uri, DATA_FILENAME), "rb"
            ) as f:
                input_stream = pa.input_stream(f)
                data = pq.read_table(input_stream)
            vals = getattr(data.to_pandas(), DATA_VAR).values
            return np.reshape(vals, shape_tuple)
        except ImportError:
            raise ImportError(
                "You have an old version of a `NumpyMaterializer` ",
                "data artifact stored in the artifact store ",
                "as a `.parquet` file, which requires `pyarrow` for reading. ",
                "You can install `pyarrow` by running `pip install pyarrow`.",
            )
save(self, arr)

Writes a np.ndarray to the artifact store as a .npy file.

Parameters:

Name Type Description Default
arr NDArray[Any]

The numpy array to write.

required
Source code in zenml/materializers/numpy_materializer.py
def save(self, arr: "NDArray[Any]") -> None:
    """Writes a np.ndarray to the artifact store as a `.npy` file.

    Args:
        arr: The numpy array to write.
    """
    super().save(arr)
    with fileio.open(os.path.join(self.uri, NUMPY_FILENAME), "wb") as f:
        # This function is untyped for numpy versions supporting python
        # 3.7, but typed for numpy versions installed on python 3.8+.
        # We need to cast it to any here so that numpy doesn't complain
        # about either an untyped function call or an unused ignore
        # statement
        cast(Any, np.save)(f, arr)

pandas_materializer

Materializer for Pandas.

PandasMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (pd.DataFrame, pd.Series)
    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA

    def __init__(self, uri: str):
        """Define `self.data_path`.

        Args:
            uri: The URI where the artifact data is stored.
        """
        super().__init__(uri)
        try:
            import pyarrow  # type: ignore # noqa

            self.pyarrow_exists = True
        except ImportError:
            self.pyarrow_exists = False
            logger.warning(
                "By default, the `PandasMaterializer` stores data as a "
                "`.csv` file. If you want to store data more efficiently, "
                "you can install `pyarrow` by running "
                "'`pip install pyarrow`'. This will allow `PandasMaterializer` "
                "to automatically store the data as a `.parquet` file instead."
            )
        finally:
            self.parquet_path = os.path.join(self.uri, PARQUET_FILENAME)
            self.csv_path = os.path.join(self.uri, CSV_FILENAME)

    def load(self, data_type: Type[Any]) -> Union[pd.DataFrame, pd.Series]:
        """Reads `pd.DataFrame` or `pd.Series` from a `.parquet` or `.csv` file.

        Args:
            data_type: The type of the data to read.

        Raises:
            ImportError: If pyarrow or fastparquet is not installed.

        Returns:
            The pandas dataframe or series.
        """
        super().load(data_type)
        if fileio.exists(self.parquet_path):
            if self.pyarrow_exists:
                with fileio.open(self.parquet_path, mode="rb") as f:
                    df = pd.read_parquet(f)
            else:
                raise ImportError(
                    "You have an old version of a `PandasMaterializer` "
                    "data artifact stored in the artifact store "
                    "as a `.parquet` file, which requires `pyarrow` "
                    "for reading, You can install `pyarrow` by running "
                    "'`pip install pyarrow fastparquet`'."
                )
        else:
            with fileio.open(self.csv_path, mode="rb") as f:
                df = pd.read_csv(f, index_col=0, parse_dates=True)

        # validate the type of the data.
        def is_dataframe_or_series(
            df: Union[pd.DataFrame, pd.Series]
        ) -> Union[pd.DataFrame, pd.Series]:
            """Checks if the data is a `pd.DataFrame` or `pd.Series`.

            Args:
                df: The data to check.

            Returns:
                The data if it is a `pd.DataFrame` or `pd.Series`.
            """
            if issubclass(data_type, pd.Series):
                # Taking the first column if its a series as the assumption
                # is that there will only be one
                assert len(df.columns) == 1
                df = df[df.columns[0]]
                return df
            else:
                return df

        return is_dataframe_or_series(df)

    def save(self, df: Union[pd.DataFrame, pd.Series]) -> None:
        """Writes a pandas dataframe or series to the specified filename.

        Args:
            df: The pandas dataframe or series to write.
        """
        super().save(df)

        if isinstance(df, pd.Series):

            df = df.to_frame(name="series")

        if self.pyarrow_exists:
            with fileio.open(self.parquet_path, mode="wb") as f:
                df.to_parquet(f, compression=COMPRESSION_TYPE)
        else:
            with fileio.open(self.csv_path, mode="wb") as f:
                df.to_csv(f, index=True)

    def extract_metadata(
        self, df: Union[pd.DataFrame, pd.Series]
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given pandas dataframe or series.

        Args:
            df: The pandas dataframe or series to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        base_metadata = super().extract_metadata(df)
        pandas_metadata: Dict[str, "MetadataType"] = {"shape": df.shape}

        if isinstance(df, pd.Series):
            pandas_metadata["dtype"] = DType(df.dtype.type)
            pandas_metadata["mean"] = float(df.mean().item())
            pandas_metadata["std"] = float(df.std().item())
            pandas_metadata["min"] = float(df.min().item())
            pandas_metadata["max"] = float(df.max().item())

        else:
            pandas_metadata["dtype"] = {
                str(key): DType(value.type) for key, value in df.dtypes.items()
            }
            for stat_name, stat in {
                "mean": df.mean,
                "std": df.std,
                "min": df.min,
                "max": df.max,
            }.items():
                pandas_metadata[stat_name] = {
                    str(key): float(value)
                    for key, value in stat(numeric_only=True).to_dict().items()
                }

        return {**base_metadata, **pandas_metadata}
__init__(self, uri) special

Define self.data_path.

Parameters:

Name Type Description Default
uri str

The URI where the artifact data is stored.

required
Source code in zenml/materializers/pandas_materializer.py
def __init__(self, uri: str):
    """Define `self.data_path`.

    Args:
        uri: The URI where the artifact data is stored.
    """
    super().__init__(uri)
    try:
        import pyarrow  # type: ignore # noqa

        self.pyarrow_exists = True
    except ImportError:
        self.pyarrow_exists = False
        logger.warning(
            "By default, the `PandasMaterializer` stores data as a "
            "`.csv` file. If you want to store data more efficiently, "
            "you can install `pyarrow` by running "
            "'`pip install pyarrow`'. This will allow `PandasMaterializer` "
            "to automatically store the data as a `.parquet` file instead."
        )
    finally:
        self.parquet_path = os.path.join(self.uri, PARQUET_FILENAME)
        self.csv_path = os.path.join(self.uri, CSV_FILENAME)
extract_metadata(self, df)

Extract metadata from the given pandas dataframe or series.

Parameters:

Name Type Description Default
df Union[pandas.core.frame.DataFrame, pandas.core.series.Series]

The pandas dataframe or series to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/materializers/pandas_materializer.py
def extract_metadata(
    self, df: Union[pd.DataFrame, pd.Series]
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given pandas dataframe or series.

    Args:
        df: The pandas dataframe or series to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    base_metadata = super().extract_metadata(df)
    pandas_metadata: Dict[str, "MetadataType"] = {"shape": df.shape}

    if isinstance(df, pd.Series):
        pandas_metadata["dtype"] = DType(df.dtype.type)
        pandas_metadata["mean"] = float(df.mean().item())
        pandas_metadata["std"] = float(df.std().item())
        pandas_metadata["min"] = float(df.min().item())
        pandas_metadata["max"] = float(df.max().item())

    else:
        pandas_metadata["dtype"] = {
            str(key): DType(value.type) for key, value in df.dtypes.items()
        }
        for stat_name, stat in {
            "mean": df.mean,
            "std": df.std,
            "min": df.min,
            "max": df.max,
        }.items():
            pandas_metadata[stat_name] = {
                str(key): float(value)
                for key, value in stat(numeric_only=True).to_dict().items()
            }

    return {**base_metadata, **pandas_metadata}
load(self, data_type)

Reads pd.DataFrame or pd.Series from a .parquet or .csv file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Exceptions:

Type Description
ImportError

If pyarrow or fastparquet is not installed.

Returns:

Type Description
Union[pandas.core.frame.DataFrame, pandas.core.series.Series]

The pandas dataframe or series.

Source code in zenml/materializers/pandas_materializer.py
def load(self, data_type: Type[Any]) -> Union[pd.DataFrame, pd.Series]:
    """Reads `pd.DataFrame` or `pd.Series` from a `.parquet` or `.csv` file.

    Args:
        data_type: The type of the data to read.

    Raises:
        ImportError: If pyarrow or fastparquet is not installed.

    Returns:
        The pandas dataframe or series.
    """
    super().load(data_type)
    if fileio.exists(self.parquet_path):
        if self.pyarrow_exists:
            with fileio.open(self.parquet_path, mode="rb") as f:
                df = pd.read_parquet(f)
        else:
            raise ImportError(
                "You have an old version of a `PandasMaterializer` "
                "data artifact stored in the artifact store "
                "as a `.parquet` file, which requires `pyarrow` "
                "for reading, You can install `pyarrow` by running "
                "'`pip install pyarrow fastparquet`'."
            )
    else:
        with fileio.open(self.csv_path, mode="rb") as f:
            df = pd.read_csv(f, index_col=0, parse_dates=True)

    # validate the type of the data.
    def is_dataframe_or_series(
        df: Union[pd.DataFrame, pd.Series]
    ) -> Union[pd.DataFrame, pd.Series]:
        """Checks if the data is a `pd.DataFrame` or `pd.Series`.

        Args:
            df: The data to check.

        Returns:
            The data if it is a `pd.DataFrame` or `pd.Series`.
        """
        if issubclass(data_type, pd.Series):
            # Taking the first column if its a series as the assumption
            # is that there will only be one
            assert len(df.columns) == 1
            df = df[df.columns[0]]
            return df
        else:
            return df

    return is_dataframe_or_series(df)
save(self, df)

Writes a pandas dataframe or series to the specified filename.

Parameters:

Name Type Description Default
df Union[pandas.core.frame.DataFrame, pandas.core.series.Series]

The pandas dataframe or series to write.

required
Source code in zenml/materializers/pandas_materializer.py
def save(self, df: Union[pd.DataFrame, pd.Series]) -> None:
    """Writes a pandas dataframe or series to the specified filename.

    Args:
        df: The pandas dataframe or series to write.
    """
    super().save(df)

    if isinstance(df, pd.Series):

        df = df.to_frame(name="series")

    if self.pyarrow_exists:
        with fileio.open(self.parquet_path, mode="wb") as f:
            df.to_parquet(f, compression=COMPRESSION_TYPE)
    else:
        with fileio.open(self.csv_path, mode="wb") as f:
            df.to_csv(f, index=True)

pydantic_materializer

Implementation of ZenML's pydantic materializer.

PydanticMaterializer (BaseMaterializer)

Handle Pydantic BaseModel objects.

Source code in zenml/materializers/pydantic_materializer.py
class PydanticMaterializer(BaseMaterializer):
    """Handle Pydantic BaseModel objects."""

    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.DATA
    ASSOCIATED_TYPES = (BaseModel,)

    def load(self, data_type: Type[BaseModel]) -> Any:
        """Reads BaseModel from JSON.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        contents = super().load(data_type)
        data_path = os.path.join(self.uri, DEFAULT_FILENAME)
        contents = yaml_utils.read_json(data_path)
        return data_type.parse_raw(contents)

    def save(self, data: BaseModel) -> None:
        """Serialize a BaseModel to JSON.

        Args:
            data: The data to store.
        """
        super().save(data)
        data_path = os.path.join(self.uri, DEFAULT_FILENAME)
        yaml_utils.write_json(data_path, data.json())

    def extract_metadata(self, data: BaseModel) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given BaseModel object.

        Args:
            data: The BaseModel object to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        base_metadata = super().extract_metadata(data)
        container_metadata = {
            "schema": data.schema(),
        }
        return {**base_metadata, **container_metadata}
extract_metadata(self, data)

Extract metadata from the given BaseModel object.

Parameters:

Name Type Description Default
data BaseModel

The BaseModel object to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/materializers/pydantic_materializer.py
def extract_metadata(self, data: BaseModel) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given BaseModel object.

    Args:
        data: The BaseModel object to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    base_metadata = super().extract_metadata(data)
    container_metadata = {
        "schema": data.schema(),
    }
    return {**base_metadata, **container_metadata}
load(self, data_type)

Reads BaseModel from JSON.

Parameters:

Name Type Description Default
data_type Type[pydantic.main.BaseModel]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in zenml/materializers/pydantic_materializer.py
def load(self, data_type: Type[BaseModel]) -> Any:
    """Reads BaseModel from JSON.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    contents = super().load(data_type)
    data_path = os.path.join(self.uri, DEFAULT_FILENAME)
    contents = yaml_utils.read_json(data_path)
    return data_type.parse_raw(contents)
save(self, data)

Serialize a BaseModel to JSON.

Parameters:

Name Type Description Default
data BaseModel

The data to store.

required
Source code in zenml/materializers/pydantic_materializer.py
def save(self, data: BaseModel) -> None:
    """Serialize a BaseModel to JSON.

    Args:
        data: The data to store.
    """
    super().save(data)
    data_path = os.path.join(self.uri, DEFAULT_FILENAME)
    yaml_utils.write_json(data_path, data.json())

service_materializer

Implementation of a materializer to read and write ZenML service instances.

ServiceMaterializer (BaseMaterializer)

Materializer to read/write service instances.

Source code in zenml/materializers/service_materializer.py
class ServiceMaterializer(BaseMaterializer):
    """Materializer to read/write service instances."""

    ASSOCIATED_TYPES = (BaseService,)
    ASSOCIATED_ARTIFACT_TYPE = ArtifactType.SERVICE

    def load(self, data_type: Type[Any]) -> BaseService:
        """Creates and returns a service.

        This service is instantiated from the serialized service configuration
        and last known status information saved as artifact.

        Args:
            data_type: The type of the data to read.

        Returns:
            A ZenML service instance.
        """
        super().load(data_type)
        filepath = os.path.join(self.uri, SERVICE_CONFIG_FILENAME)
        with fileio.open(filepath, "r") as f:
            service = ServiceRegistry().load_service_from_json(f.read())
        return service

    def save(self, service: BaseService) -> None:
        """Writes a ZenML service.

        The configuration and last known status of the input service instance
        are serialized and saved as an artifact.

        Args:
            service: A ZenML service instance.
        """
        super().save(service)
        filepath = os.path.join(self.uri, SERVICE_CONFIG_FILENAME)
        with fileio.open(filepath, "w") as f:
            f.write(service.json(indent=4))

    def extract_metadata(
        self, service: BaseService
    ) -> Dict[str, "MetadataType"]:
        """Extract metadata from the given service.

        Args:
            service: The service to extract metadata from.

        Returns:
            The extracted metadata as a dictionary.
        """
        from zenml.metadata.metadata_types import Uri

        base_metadata = super().extract_metadata(service)
        service_metadata: Dict[str, "MetadataType"] = {}
        if service.endpoint and service.endpoint.status.uri:
            service_metadata["uri"] = Uri(service.endpoint.status.uri)
        return {**base_metadata, **service_metadata}
extract_metadata(self, service)

Extract metadata from the given service.

Parameters:

Name Type Description Default
service BaseService

The service to extract metadata from.

required

Returns:

Type Description
Dict[str, MetadataType]

The extracted metadata as a dictionary.

Source code in zenml/materializers/service_materializer.py
def extract_metadata(
    self, service: BaseService
) -> Dict[str, "MetadataType"]:
    """Extract metadata from the given service.

    Args:
        service: The service to extract metadata from.

    Returns:
        The extracted metadata as a dictionary.
    """
    from zenml.metadata.metadata_types import Uri

    base_metadata = super().extract_metadata(service)
    service_metadata: Dict[str, "MetadataType"] = {}
    if service.endpoint and service.endpoint.status.uri:
        service_metadata["uri"] = Uri(service.endpoint.status.uri)
    return {**base_metadata, **service_metadata}
load(self, data_type)

Creates and returns a service.

This service is instantiated from the serialized service configuration and last known status information saved as artifact.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
BaseService

A ZenML service instance.

Source code in zenml/materializers/service_materializer.py
def load(self, data_type: Type[Any]) -> BaseService:
    """Creates and returns a service.

    This service is instantiated from the serialized service configuration
    and last known status information saved as artifact.

    Args:
        data_type: The type of the data to read.

    Returns:
        A ZenML service instance.
    """
    super().load(data_type)
    filepath = os.path.join(self.uri, SERVICE_CONFIG_FILENAME)
    with fileio.open(filepath, "r") as f:
        service = ServiceRegistry().load_service_from_json(f.read())
    return service
save(self, service)

Writes a ZenML service.

The configuration and last known status of the input service instance are serialized and saved as an artifact.

Parameters:

Name Type Description Default
service BaseService

A ZenML service instance.

required
Source code in zenml/materializers/service_materializer.py
def save(self, service: BaseService) -> None:
    """Writes a ZenML service.

    The configuration and last known status of the input service instance
    are serialized and saved as an artifact.

    Args:
        service: A ZenML service instance.
    """
    super().save(service)
    filepath = os.path.join(self.uri, SERVICE_CONFIG_FILENAME)
    with fileio.open(filepath, "w") as f:
        f.write(service.json(indent=4))

unmaterialized_artifact

Unmaterialized artifact class.

UnmaterializedArtifact (ArtifactResponseModel) pydantic-model

Unmaterialized artifact class.

Typing a step input to have this type will cause ZenML to not materialize the artifact. This is useful for steps that need to access the artifact metadata instead of the actual artifact data.

Usage example:

from zenml.steps import step
from zenml.materializers import UnmaterializedArtifact

@step
def my_step(input_artifact: UnmaterializedArtifact):
    print(input_artifact.uri)
Source code in zenml/materializers/unmaterialized_artifact.py
class UnmaterializedArtifact(ArtifactResponseModel):
    """Unmaterialized artifact class.

    Typing a step input to have this type will cause ZenML to not materialize
    the artifact. This is useful for steps that need to access the artifact
    metadata instead of the actual artifact data.

    Usage example:

    ```python
    from zenml.steps import step
    from zenml.materializers import UnmaterializedArtifact

    @step
    def my_step(input_artifact: UnmaterializedArtifact):
        print(input_artifact.uri)
    ```
    """
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.