Skip to content

Materializers

zenml.materializers special

Initialization of ZenML materializers.

Materializers are used to convert a ZenML artifact into a specific format. They are most often used to handle the input or output of ZenML steps, and can be extended by building on the BaseMaterializer class.

base_materializer

Metaclass implementation for registering ZenML BaseMaterializer subclasses.

BaseMaterializer

Base Materializer to realize artifact data.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializer(metaclass=BaseMaterializerMeta):
    """Base Materializer to realize artifact data."""

    ASSOCIATED_ARTIFACT_TYPES: ClassVar[Tuple[Type["BaseArtifact"], ...]] = ()
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = ()

    def __init__(self, artifact: "BaseArtifact"):
        """Initializes a materializer with the given artifact.

        Args:
            artifact: The artifact to materialize.
        """
        self.artifact = artifact

    def _can_handle_type(self, data_type: Type[Any]) -> bool:
        """Whether the materializer can read/write a certain type.

        Args:
            data_type: The type to check.

        Returns:
            Whether the materializer can read/write the given type.
        """
        return any(
            issubclass(data_type, associated_type)
            for associated_type in self.ASSOCIATED_TYPES
        )

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Write logic here to handle input of the step function.

        Args:
            data_type: What type the input should be materialized as.

        Raises:
            TypeError: If the data is not of the correct type.
        """
        if not self._can_handle_type(data_type):
            raise TypeError(
                f"Unable to handle type {data_type}. {self.__class__.__name__} "
                f"can only read artifacts to the following types: "
                f"{self.ASSOCIATED_TYPES}."
            )

    def handle_return(self, data: Any) -> None:
        """Write logic here to handle return of the step function.

        Args:
            data: Any object that is specified as an input artifact of the step.

        Raises:
            TypeError: If the data is not of the correct type.
        """
        data_type = type(data)
        if not self._can_handle_type(data_type):
            raise TypeError(
                f"Unable to write {data_type}. {self.__class__.__name__} "
                f"can only write the following types: {self.ASSOCIATED_TYPES}."
            )
__init__(self, artifact) special

Initializes a materializer with the given artifact.

Parameters:

Name Type Description Default
artifact BaseArtifact

The artifact to materialize.

required
Source code in zenml/materializers/base_materializer.py
def __init__(self, artifact: "BaseArtifact"):
    """Initializes a materializer with the given artifact.

    Args:
        artifact: The artifact to materialize.
    """
    self.artifact = artifact
handle_input(self, data_type)

Write logic here to handle input of the step function.

Parameters:

Name Type Description Default
data_type Type[Any]

What type the input should be materialized as.

required

Exceptions:

Type Description
TypeError

If the data is not of the correct type.

Source code in zenml/materializers/base_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Write logic here to handle input of the step function.

    Args:
        data_type: What type the input should be materialized as.

    Raises:
        TypeError: If the data is not of the correct type.
    """
    if not self._can_handle_type(data_type):
        raise TypeError(
            f"Unable to handle type {data_type}. {self.__class__.__name__} "
            f"can only read artifacts to the following types: "
            f"{self.ASSOCIATED_TYPES}."
        )
handle_return(self, data)

Write logic here to handle return of the step function.

Parameters:

Name Type Description Default
data Any

Any object that is specified as an input artifact of the step.

required

Exceptions:

Type Description
TypeError

If the data is not of the correct type.

Source code in zenml/materializers/base_materializer.py
def handle_return(self, data: Any) -> None:
    """Write logic here to handle return of the step function.

    Args:
        data: Any object that is specified as an input artifact of the step.

    Raises:
        TypeError: If the data is not of the correct type.
    """
    data_type = type(data)
    if not self._can_handle_type(data_type):
        raise TypeError(
            f"Unable to write {data_type}. {self.__class__.__name__} "
            f"can only write the following types: {self.ASSOCIATED_TYPES}."
        )

BaseMaterializerMeta (type)

Metaclass responsible for registering different BaseMaterializer subclasses.

Materializers are used for reading/writing artifacts.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializerMeta(type):
    """Metaclass responsible for registering different BaseMaterializer subclasses.

    Materializers are used for reading/writing artifacts.
    """

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BaseMaterializerMeta":
        """Creates a Materializer class and registers it at the `MaterializerRegistry`.

        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The dictionary of the class.

        Returns:
            The BaseMaterializerMeta class.

        Raises:
            MaterializerInterfaceError: If the class was improperly defined.
        """
        cls = cast(
            Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
        )
        if name != "BaseMaterializer":
            from zenml.artifacts.base_artifact import BaseArtifact

            if not cls.ASSOCIATED_TYPES:
                raise MaterializerInterfaceError(
                    f"Invalid materializer class '{name}'. When creating a "
                    f"custom materializer, make sure to specify at least one "
                    f"type in its ASSOCIATED_TYPES class variable.",
                    url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
                )

            for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
                if not (
                    inspect.isclass(artifact_type)
                    and issubclass(artifact_type, BaseArtifact)
                ):
                    raise MaterializerInterfaceError(
                        f"Associated artifact type {artifact_type} for "
                        f"materializer {name} is not a `BaseArtifact` "
                        f"subclass.",
                        url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
                    )

            artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
            for associated_type in cls.ASSOCIATED_TYPES:
                if not inspect.isclass(associated_type):
                    raise MaterializerInterfaceError(
                        f"Associated type {associated_type} for materializer "
                        f"{name} is not a class.",
                        url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
                    )

                default_materializer_registry.register_materializer_type(
                    associated_type, cls
                )

                type_registry.register_integration(
                    associated_type, artifact_types
                )
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Creates a Materializer class and registers it at the MaterializerRegistry.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases Tuple[Type[Any], ...]

The base classes of the class.

required
dct Dict[str, Any]

The dictionary of the class.

required

Returns:

Type Description
BaseMaterializerMeta

The BaseMaterializerMeta class.

Exceptions:

Type Description
MaterializerInterfaceError

If the class was improperly defined.

Source code in zenml/materializers/base_materializer.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
    """Creates a Materializer class and registers it at the `MaterializerRegistry`.

    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The dictionary of the class.

    Returns:
        The BaseMaterializerMeta class.

    Raises:
        MaterializerInterfaceError: If the class was improperly defined.
    """
    cls = cast(
        Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
    )
    if name != "BaseMaterializer":
        from zenml.artifacts.base_artifact import BaseArtifact

        if not cls.ASSOCIATED_TYPES:
            raise MaterializerInterfaceError(
                f"Invalid materializer class '{name}'. When creating a "
                f"custom materializer, make sure to specify at least one "
                f"type in its ASSOCIATED_TYPES class variable.",
                url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
            )

        for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
            if not (
                inspect.isclass(artifact_type)
                and issubclass(artifact_type, BaseArtifact)
            ):
                raise MaterializerInterfaceError(
                    f"Associated artifact type {artifact_type} for "
                    f"materializer {name} is not a `BaseArtifact` "
                    f"subclass.",
                    url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
                )

        artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
        for associated_type in cls.ASSOCIATED_TYPES:
            if not inspect.isclass(associated_type):
                raise MaterializerInterfaceError(
                    f"Associated type {associated_type} for materializer "
                    f"{name} is not a class.",
                    url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
                )

            default_materializer_registry.register_materializer_type(
                associated_type, cls
            )

            type_registry.register_integration(
                associated_type, artifact_types
            )
    return cls

built_in_materializer

Implementation of ZenML's builtin materializer.

BuiltInContainerMaterializer (BaseMaterializer)

Handle built-in container types (dict, list, set, tuple).

Source code in zenml/materializers/built_in_materializer.py
class BuiltInContainerMaterializer(BaseMaterializer):
    """Handle built-in container types (dict, list, set, tuple)."""

    ASSOCIATED_TYPES = (dict, list, set, tuple)

    def __init__(self, artifact: "BaseArtifact"):
        """Define `self.data_path` and `self.metadata_path`.

        Args:
            artifact: Artifact required by `BaseMaterializer.__init__()`.
        """
        super().__init__(artifact)
        self.data_path = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        self.metadata_path = os.path.join(
            self.artifact.uri, DEFAULT_METADATA_FILENAME
        )

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Reads a materialized built-in container object.

        If the data was serialized to JSON, deserialize it.

        Otherwise, reconstruct all elements according to the metadata file:
            1. Resolve the data type using `find_type_by_str()`,
            2. Get the materializer via the `default_materializer_registry`,
            3. Initialize the materializer with a mock `DataArtifact`, whose
                `uri` attribute is overwritten to point to the desired path,
            4. Use `handle_input()` of that materializer to load the element.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.

        Raises:
            RuntimeError: If the data was not found.
        """
        super().handle_input(data_type)

        # If the data was not serialized, there must be metadata present.
        if not fileio.exists(self.data_path) and not fileio.exists(
            self.metadata_path
        ):
            raise RuntimeError(
                f"Materialization of type {data_type} failed. Expected either"
                f"{self.data_path} or {self.metadata_path} to exist."
            )

        # If the data was serialized as JSON, deserialize it.
        if fileio.exists(self.data_path):
            outputs = yaml_utils.read_json(self.data_path)

        # Otherwise, use the metadata to reconstruct the data as a list.
        else:
            metadata = yaml_utils.read_json(self.metadata_path)
            outputs = []
            for path_, type_str in zip(metadata["paths"], metadata["types"]):
                type_ = find_type_by_str(type_str)
                materializer_class = default_materializer_registry[type_]
                mock_artifact = DataArtifact()
                mock_artifact.uri = path_
                materializer = materializer_class(mock_artifact)
                element = materializer.handle_input(type_)
                outputs.append(element)

        # Cast the data to the correct type.
        if issubclass(data_type, dict) and not isinstance(outputs, dict):
            keys, values = outputs
            return dict(zip(keys, values))
        if issubclass(data_type, tuple):
            return tuple(outputs)
        if issubclass(data_type, set):
            return set(outputs)
        return outputs

    def handle_return(self, data: Any) -> None:
        """Materialize a built-in container object.

        If the object can be serialized to JSON, serialize it.

        Otherwise, use the `default_materializer_registry` to find the correct
        materializer for each element and materialize each element into a
        subdirectory.

        Tuples and sets are cast to list before materialization.

        For non-serializable dicts, materialize keys/values as separate lists.

        Args:
            data: The built-in container object to materialize.

        Raises:
            Exception: If any exception occurs, it is raised after cleanup.
        """
        super().handle_return(data)

        # tuple and set: handle as list.
        if isinstance(data, tuple) or isinstance(data, set):
            data = list(data)

        # If the data is serializable, just write it into a single JSON file.
        if _is_serializable(data):
            yaml_utils.write_json(self.data_path, data)
            return

        # non-serializable dict: Handle as non-serializable list of lists.
        if isinstance(data, dict):
            data = [list(data.keys()), list(data.values())]

        # non-serializable list: Materialize each element into a subfolder.
        # Get path, type, and corresponding materializer for each element.
        paths, types, materializers = [], [], []
        for i, element in enumerate(data):
            element_path = os.path.join(self.artifact.uri, str(i))
            fileio.mkdir(element_path)
            type_ = find_materializer_registry_type(type(element))
            paths.append(element_path)
            types.append(str(type_))
            materializer_class = default_materializer_registry[type_]
            mock_artifact = DataArtifact()
            mock_artifact.uri = element_path
            materializer = materializer_class(mock_artifact)
            materializers.append(materializer)
        try:
            # Write metadata as JSON.
            metadata = {"length": len(data), "paths": paths, "types": types}
            yaml_utils.write_json(self.metadata_path, metadata)
            # Materialize each element.
            for element, materializer in zip(data, materializers):
                materializer.handle_return(element)
        # If an error occurs, delete all created files.
        except Exception as e:
            # Delete metadata
            if fileio.exists(self.metadata_path):
                fileio.remove(self.metadata_path)
            # Delete all elements that were already saved.
            for element_path in paths:
                fileio.rmtree(element_path)
            raise e
__init__(self, artifact) special

Define self.data_path and self.metadata_path.

Parameters:

Name Type Description Default
artifact BaseArtifact

Artifact required by BaseMaterializer.__init__().

required
Source code in zenml/materializers/built_in_materializer.py
def __init__(self, artifact: "BaseArtifact"):
    """Define `self.data_path` and `self.metadata_path`.

    Args:
        artifact: Artifact required by `BaseMaterializer.__init__()`.
    """
    super().__init__(artifact)
    self.data_path = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    self.metadata_path = os.path.join(
        self.artifact.uri, DEFAULT_METADATA_FILENAME
    )
handle_input(self, data_type)

Reads a materialized built-in container object.

If the data was serialized to JSON, deserialize it.

Otherwise, reconstruct all elements according to the metadata file: 1. Resolve the data type using find_type_by_str(), 2. Get the materializer via the default_materializer_registry, 3. Initialize the materializer with a mock DataArtifact, whose uri attribute is overwritten to point to the desired path, 4. Use handle_input() of that materializer to load the element.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Exceptions:

Type Description
RuntimeError

If the data was not found.

Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Reads a materialized built-in container object.

    If the data was serialized to JSON, deserialize it.

    Otherwise, reconstruct all elements according to the metadata file:
        1. Resolve the data type using `find_type_by_str()`,
        2. Get the materializer via the `default_materializer_registry`,
        3. Initialize the materializer with a mock `DataArtifact`, whose
            `uri` attribute is overwritten to point to the desired path,
        4. Use `handle_input()` of that materializer to load the element.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.

    Raises:
        RuntimeError: If the data was not found.
    """
    super().handle_input(data_type)

    # If the data was not serialized, there must be metadata present.
    if not fileio.exists(self.data_path) and not fileio.exists(
        self.metadata_path
    ):
        raise RuntimeError(
            f"Materialization of type {data_type} failed. Expected either"
            f"{self.data_path} or {self.metadata_path} to exist."
        )

    # If the data was serialized as JSON, deserialize it.
    if fileio.exists(self.data_path):
        outputs = yaml_utils.read_json(self.data_path)

    # Otherwise, use the metadata to reconstruct the data as a list.
    else:
        metadata = yaml_utils.read_json(self.metadata_path)
        outputs = []
        for path_, type_str in zip(metadata["paths"], metadata["types"]):
            type_ = find_type_by_str(type_str)
            materializer_class = default_materializer_registry[type_]
            mock_artifact = DataArtifact()
            mock_artifact.uri = path_
            materializer = materializer_class(mock_artifact)
            element = materializer.handle_input(type_)
            outputs.append(element)

    # Cast the data to the correct type.
    if issubclass(data_type, dict) and not isinstance(outputs, dict):
        keys, values = outputs
        return dict(zip(keys, values))
    if issubclass(data_type, tuple):
        return tuple(outputs)
    if issubclass(data_type, set):
        return set(outputs)
    return outputs
handle_return(self, data)

Materialize a built-in container object.

If the object can be serialized to JSON, serialize it.

Otherwise, use the default_materializer_registry to find the correct materializer for each element and materialize each element into a subdirectory.

Tuples and sets are cast to list before materialization.

For non-serializable dicts, materialize keys/values as separate lists.

Parameters:

Name Type Description Default
data Any

The built-in container object to materialize.

required

Exceptions:

Type Description
Exception

If any exception occurs, it is raised after cleanup.

Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
    """Materialize a built-in container object.

    If the object can be serialized to JSON, serialize it.

    Otherwise, use the `default_materializer_registry` to find the correct
    materializer for each element and materialize each element into a
    subdirectory.

    Tuples and sets are cast to list before materialization.

    For non-serializable dicts, materialize keys/values as separate lists.

    Args:
        data: The built-in container object to materialize.

    Raises:
        Exception: If any exception occurs, it is raised after cleanup.
    """
    super().handle_return(data)

    # tuple and set: handle as list.
    if isinstance(data, tuple) or isinstance(data, set):
        data = list(data)

    # If the data is serializable, just write it into a single JSON file.
    if _is_serializable(data):
        yaml_utils.write_json(self.data_path, data)
        return

    # non-serializable dict: Handle as non-serializable list of lists.
    if isinstance(data, dict):
        data = [list(data.keys()), list(data.values())]

    # non-serializable list: Materialize each element into a subfolder.
    # Get path, type, and corresponding materializer for each element.
    paths, types, materializers = [], [], []
    for i, element in enumerate(data):
        element_path = os.path.join(self.artifact.uri, str(i))
        fileio.mkdir(element_path)
        type_ = find_materializer_registry_type(type(element))
        paths.append(element_path)
        types.append(str(type_))
        materializer_class = default_materializer_registry[type_]
        mock_artifact = DataArtifact()
        mock_artifact.uri = element_path
        materializer = materializer_class(mock_artifact)
        materializers.append(materializer)
    try:
        # Write metadata as JSON.
        metadata = {"length": len(data), "paths": paths, "types": types}
        yaml_utils.write_json(self.metadata_path, metadata)
        # Materialize each element.
        for element, materializer in zip(data, materializers):
            materializer.handle_return(element)
    # If an error occurs, delete all created files.
    except Exception as e:
        # Delete metadata
        if fileio.exists(self.metadata_path):
            fileio.remove(self.metadata_path)
        # Delete all elements that were already saved.
        for element_path in paths:
            fileio.rmtree(element_path)
        raise e

BuiltInMaterializer (BaseMaterializer)

Handle JSON-serializable basic types (bool, float, int, str).

Source code in zenml/materializers/built_in_materializer.py
class BuiltInMaterializer(BaseMaterializer):
    """Handle JSON-serializable basic types (`bool`, `float`, `int`, `str`)."""

    ASSOCIATED_ARTIFACT_TYPES = (
        DataArtifact,
        DataAnalysisArtifact,
    )
    ASSOCIATED_TYPES = BASIC_TYPES

    def __init__(self, artifact: "BaseArtifact"):
        """Define `self.data_path`.

        Args:
            artifact: Artifact required by `BaseMaterializer.__init__()`.
        """
        super().__init__(artifact)
        self.data_path = os.path.join(self.artifact.uri, DEFAULT_FILENAME)

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Reads basic primitive types from JSON.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        super().handle_input(data_type)
        contents = yaml_utils.read_json(self.data_path)
        if type(contents) != data_type:
            # TODO [ENG-142]: Raise error or try to coerce
            logger.debug(
                f"Contents {contents} was type {type(contents)} but expected "
                f"{data_type}"
            )
        return contents

    def handle_return(self, data: Any) -> None:
        """Serialize a basic type to JSON.

        Args:
            data: The data to store.
        """
        super().handle_return(data)
        yaml_utils.write_json(self.data_path, data)
__init__(self, artifact) special

Define self.data_path.

Parameters:

Name Type Description Default
artifact BaseArtifact

Artifact required by BaseMaterializer.__init__().

required
Source code in zenml/materializers/built_in_materializer.py
def __init__(self, artifact: "BaseArtifact"):
    """Define `self.data_path`.

    Args:
        artifact: Artifact required by `BaseMaterializer.__init__()`.
    """
    super().__init__(artifact)
    self.data_path = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
handle_input(self, data_type)

Reads basic primitive types from JSON.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Reads basic primitive types from JSON.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    super().handle_input(data_type)
    contents = yaml_utils.read_json(self.data_path)
    if type(contents) != data_type:
        # TODO [ENG-142]: Raise error or try to coerce
        logger.debug(
            f"Contents {contents} was type {type(contents)} but expected "
            f"{data_type}"
        )
    return contents
handle_return(self, data)

Serialize a basic type to JSON.

Parameters:

Name Type Description Default
data Any

The data to store.

required
Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
    """Serialize a basic type to JSON.

    Args:
        data: The data to store.
    """
    super().handle_return(data)
    yaml_utils.write_json(self.data_path, data)

BytesMaterializer (BaseMaterializer)

Handle bytes data type, which is not JSON serializable.

Source code in zenml/materializers/built_in_materializer.py
class BytesMaterializer(BaseMaterializer):
    """Handle `bytes` data type, which is not JSON serializable."""

    ASSOCIATED_ARTIFACT_TYPES = (DataArtifact, DataAnalysisArtifact)
    ASSOCIATED_TYPES = (bytes,)

    def __init__(self, artifact: "BaseArtifact"):
        """Define `self.data_path`.

        Args:
            artifact: Artifact required by `BaseMaterializer.__init__()`.
        """
        super().__init__(artifact)
        self.data_path = os.path.join(self.artifact.uri, DEFAULT_BYTES_FILENAME)

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Reads a bytes object from file.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        super().handle_input(data_type)
        with fileio.open(self.data_path, "rb") as file_:
            return file_.read()

    def handle_return(self, data: Any) -> None:
        """Save a bytes object to file.

        Args:
            data: The data to store.
        """
        super().handle_return(data)
        with fileio.open(self.data_path, "wb") as file_:
            file_.write(data)
__init__(self, artifact) special

Define self.data_path.

Parameters:

Name Type Description Default
artifact BaseArtifact

Artifact required by BaseMaterializer.__init__().

required
Source code in zenml/materializers/built_in_materializer.py
def __init__(self, artifact: "BaseArtifact"):
    """Define `self.data_path`.

    Args:
        artifact: Artifact required by `BaseMaterializer.__init__()`.
    """
    super().__init__(artifact)
    self.data_path = os.path.join(self.artifact.uri, DEFAULT_BYTES_FILENAME)
handle_input(self, data_type)

Reads a bytes object from file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Reads a bytes object from file.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    super().handle_input(data_type)
    with fileio.open(self.data_path, "rb") as file_:
        return file_.read()
handle_return(self, data)

Save a bytes object to file.

Parameters:

Name Type Description Default
data Any

The data to store.

required
Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
    """Save a bytes object to file.

    Args:
        data: The data to store.
    """
    super().handle_return(data)
    with fileio.open(self.data_path, "wb") as file_:
        file_.write(data)

find_materializer_registry_type(type_)

For a given type, find the type registered in the registry.

This can be either the type itself, or a superclass of the type.

Parameters:

Name Type Description Default
type_ Type[Any]

The type to find.

required

Returns:

Type Description
Type[Any]

The type registered in the registry.

Exceptions:

Type Description
RuntimeError

If the type could not be resolved.

Source code in zenml/materializers/built_in_materializer.py
def find_materializer_registry_type(type_: Type[Any]) -> Type[Any]:
    """For a given type, find the type registered in the registry.

    This can be either the type itself, or a superclass of the type.

    Args:
        type_: The type to find.

    Returns:
        The type registered in the registry.

    Raises:
        RuntimeError: If the type could not be resolved.
    """
    # Check that a unique materializer is registered for this type
    default_materializer_registry[type_]

    # Check if the type itself is registered
    registered_types = default_materializer_registry.materializer_types.keys()
    if type_ in registered_types:
        return type_

    # Check if a superclass of the type is registered
    for registered_type in registered_types:
        if issubclass(type_, registered_type):
            return registered_type

    # Raise an error otherwise - this should never happen since
    # `default_materializer_registry[type_]` should have raised an error already
    raise RuntimeError(
        f"Cannot find a materializer for type '{type_}' in the "
        f"materializer registry."
    )

find_type_by_str(type_str)

Get a Python type, given its string representation.

E.g., "" should resolve to int.

Currently this is implemented by checking all artifact types registered in the default_materializer_registry. This means, only types in the registry can be found. Any other types will cause a RunTimeError.

Parameters:

Name Type Description Default
type_str str

The string representation of a type.

required

Exceptions:

Type Description
RuntimeError

If the type could not be resolved.

Returns:

Type Description
Type[Any]

The type whose string representation is type_str.

Source code in zenml/materializers/built_in_materializer.py
def find_type_by_str(type_str: str) -> Type[Any]:
    """Get a Python type, given its string representation.

    E.g., "<class 'int'>" should resolve to `int`.

    Currently this is implemented by checking all artifact types registered in
    the `default_materializer_registry`. This means, only types in the registry
    can be found. Any other types will cause a `RunTimeError`.

    Args:
        type_str: The string representation of a type.

    Raises:
        RuntimeError: If the type could not be resolved.

    Returns:
        The type whose string representation is `type_str`.
    """
    registered_types = default_materializer_registry.materializer_types.keys()
    type_str_mapping = {str(type_): type_ for type_ in registered_types}
    if type_str in type_str_mapping:
        return type_str_mapping[type_str]
    raise RuntimeError(f"Cannot resolve type '{type_str}'.")

default_materializer_registry

Implementation of a default materializer registry.

MaterializerRegistry

Matches a Python type to a default materializer.

Source code in zenml/materializers/default_materializer_registry.py
class MaterializerRegistry:
    """Matches a Python type to a default materializer."""

    def __init__(self) -> None:
        """Initialize the materializer registry."""
        self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}

    def register_materializer_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer.

        Args:
            key: Indicates the type of object.
            type_: A BaseMaterializer subclass.
        """
        if key not in self.materializer_types:
            self.materializer_types[key] = type_
            logger.debug(f"Registered materializer {type_} for {key}")
        else:
            logger.debug(
                f"Found existing materializer class for {key}: "
                f"{self.materializer_types[key]}. Skipping registration of "
                f"{type_}."
            )

    def register_and_overwrite_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer and also overwrites a default if set.

        Args:
            key: Indicates the type of object.
            type_: A BaseMaterializer subclass.
        """
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")

    def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
        """Get a single materializers based on the key.

        Args:
            key: Indicates the type of object.

        Returns:
            `BaseMaterializer` subclass that was registered for this key.

        Raises:
            StepInterfaceError: If the key (or any of its superclasses) is not
                registered or the key has more than one superclass with
                different default materializers
        """
        # Check whether the type is registered
        if key in self.materializer_types:
            return self.materializer_types[key]

        # If the type is not registered, check for superclasses
        materializers_for_compatible_superclasses = {
            materializer
            for registered_type, materializer in self.materializer_types.items()
            if issubclass(key, registered_type)
        }

        # Make sure that there is only a single materializer
        if len(materializers_for_compatible_superclasses) == 1:
            return materializers_for_compatible_superclasses.pop()
        if len(materializers_for_compatible_superclasses) > 1:
            raise StepInterfaceError(
                f"Type {key} is subclassing more than one type, thus it "
                f"maps to multiple materializers within the materializer "
                f"registry: {materializers_for_compatible_superclasses}. "
                f"Please specify which of these materializers you would "
                f"like to use explicitly in your step.",
                url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
            )
        raise StepInterfaceError(
            f"No materializer registered for class {key}. You can register a "
            f"default materializer for specific types by subclassing "
            f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
            f" variable.",
            url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
        )

    def get_materializer_types(
        self,
    ) -> Dict[Type[Any], Type["BaseMaterializer"]]:
        """Get all registered materializer types.

        Returns:
            A dictionary of registered materializer types.
        """
        return self.materializer_types

    def is_registered(self, key: Type[Any]) -> bool:
        """Returns if a materializer class is registered for the given type.

        Args:
            key: Indicates the type of object.

        Returns:
            True if a materializer is registered for the given type, False
            otherwise.
        """
        return any(issubclass(key, type_) for type_ in self.materializer_types)
__getitem__(self, key) special

Get a single materializers based on the key.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required

Returns:

Type Description
Type[BaseMaterializer]

BaseMaterializer subclass that was registered for this key.

Exceptions:

Type Description
StepInterfaceError

If the key (or any of its superclasses) is not registered or the key has more than one superclass with different default materializers

Source code in zenml/materializers/default_materializer_registry.py
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
    """Get a single materializers based on the key.

    Args:
        key: Indicates the type of object.

    Returns:
        `BaseMaterializer` subclass that was registered for this key.

    Raises:
        StepInterfaceError: If the key (or any of its superclasses) is not
            registered or the key has more than one superclass with
            different default materializers
    """
    # Check whether the type is registered
    if key in self.materializer_types:
        return self.materializer_types[key]

    # If the type is not registered, check for superclasses
    materializers_for_compatible_superclasses = {
        materializer
        for registered_type, materializer in self.materializer_types.items()
        if issubclass(key, registered_type)
    }

    # Make sure that there is only a single materializer
    if len(materializers_for_compatible_superclasses) == 1:
        return materializers_for_compatible_superclasses.pop()
    if len(materializers_for_compatible_superclasses) > 1:
        raise StepInterfaceError(
            f"Type {key} is subclassing more than one type, thus it "
            f"maps to multiple materializers within the materializer "
            f"registry: {materializers_for_compatible_superclasses}. "
            f"Please specify which of these materializers you would "
            f"like to use explicitly in your step.",
            url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
        )
    raise StepInterfaceError(
        f"No materializer registered for class {key}. You can register a "
        f"default materializer for specific types by subclassing "
        f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
        f" variable.",
        url="https://docs.zenml.io/advanced-guide/pipelines/materializers",
    )
__init__(self) special

Initialize the materializer registry.

Source code in zenml/materializers/default_materializer_registry.py
def __init__(self) -> None:
    """Initialize the materializer registry."""
    self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}
get_materializer_types(self)

Get all registered materializer types.

Returns:

Type Description
Dict[Type[Any], Type[BaseMaterializer]]

A dictionary of registered materializer types.

Source code in zenml/materializers/default_materializer_registry.py
def get_materializer_types(
    self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
    """Get all registered materializer types.

    Returns:
        A dictionary of registered materializer types.
    """
    return self.materializer_types
is_registered(self, key)

Returns if a materializer class is registered for the given type.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required

Returns:

Type Description
bool

True if a materializer is registered for the given type, False otherwise.

Source code in zenml/materializers/default_materializer_registry.py
def is_registered(self, key: Type[Any]) -> bool:
    """Returns if a materializer class is registered for the given type.

    Args:
        key: Indicates the type of object.

    Returns:
        True if a materializer is registered for the given type, False
        otherwise.
    """
    return any(issubclass(key, type_) for type_ in self.materializer_types)
register_and_overwrite_type(self, key, type_)

Registers a new materializer and also overwrites a default if set.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_and_overwrite_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer and also overwrites a default if set.

    Args:
        key: Indicates the type of object.
        type_: A BaseMaterializer subclass.
    """
    self.materializer_types[key] = type_
    logger.debug(f"Registered materializer {type_} for {key}")
register_materializer_type(self, key, type_)

Registers a new materializer.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_materializer_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer.

    Args:
        key: Indicates the type of object.
        type_: A BaseMaterializer subclass.
    """
    if key not in self.materializer_types:
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")
    else:
        logger.debug(
            f"Found existing materializer class for {key}: "
            f"{self.materializer_types[key]}. Skipping registration of "
            f"{type_}."
        )

numpy_materializer

Implementation of the ZenML NumPy materializer.

NumpyMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/numpy_materializer.py
class NumpyMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (np.ndarray,)
    ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)

    def handle_input(self, data_type: Type[Any]) -> "NDArray[Any]":
        """Reads numpy array from parquet file.

        Args:
            data_type: The type of the data to read.

        Returns:
            The numpy array.
        """
        super().handle_input(data_type)
        shape_dict = yaml_utils.read_json(
            os.path.join(self.artifact.uri, SHAPE_FILENAME)
        )
        shape_tuple = tuple(shape_dict.values())
        with fileio.open(
            os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
        ) as f:
            input_stream = pa.input_stream(f)
            data = pq.read_table(input_stream)
        vals = getattr(data.to_pandas(), DATA_VAR).values
        return np.reshape(vals, shape_tuple)

    def handle_return(self, arr: "NDArray[Any]") -> None:
        """Writes a np.ndarray to the artifact store as a parquet file.

        Args:
            arr: The numpy array to write.
        """
        super().handle_return(arr)
        yaml_utils.write_json(
            os.path.join(self.artifact.uri, SHAPE_FILENAME),
            {str(i): x for i, x in enumerate(arr.shape)},
        )
        pa_table = pa.table({DATA_VAR: arr.flatten()})
        with fileio.open(
            os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
        ) as f:
            stream = pa.output_stream(f)
            pq.write_table(pa_table, stream)
handle_input(self, data_type)

Reads numpy array from parquet file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
NDArray[Any]

The numpy array.

Source code in zenml/materializers/numpy_materializer.py
def handle_input(self, data_type: Type[Any]) -> "NDArray[Any]":
    """Reads numpy array from parquet file.

    Args:
        data_type: The type of the data to read.

    Returns:
        The numpy array.
    """
    super().handle_input(data_type)
    shape_dict = yaml_utils.read_json(
        os.path.join(self.artifact.uri, SHAPE_FILENAME)
    )
    shape_tuple = tuple(shape_dict.values())
    with fileio.open(
        os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
    ) as f:
        input_stream = pa.input_stream(f)
        data = pq.read_table(input_stream)
    vals = getattr(data.to_pandas(), DATA_VAR).values
    return np.reshape(vals, shape_tuple)
handle_return(self, arr)

Writes a np.ndarray to the artifact store as a parquet file.

Parameters:

Name Type Description Default
arr NDArray[Any]

The numpy array to write.

required
Source code in zenml/materializers/numpy_materializer.py
def handle_return(self, arr: "NDArray[Any]") -> None:
    """Writes a np.ndarray to the artifact store as a parquet file.

    Args:
        arr: The numpy array to write.
    """
    super().handle_return(arr)
    yaml_utils.write_json(
        os.path.join(self.artifact.uri, SHAPE_FILENAME),
        {str(i): x for i, x in enumerate(arr.shape)},
    )
    pa_table = pa.table({DATA_VAR: arr.flatten()})
    with fileio.open(
        os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
    ) as f:
        stream = pa.output_stream(f)
        pq.write_table(pa_table, stream)

pandas_materializer

Materializer for Pandas.

PandasMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (pd.DataFrame, pd.Series)
    ASSOCIATED_ARTIFACT_TYPES = (
        DataArtifact,
        StatisticsArtifact,
        SchemaArtifact,
    )

    def handle_input(
        self, data_type: Type[Any]
    ) -> Union[pd.DataFrame, pd.Series]:
        """Reads pd.DataFrame or pd.Series from a parquet file.

        Args:
            data_type: The type of the data to read.

        Returns:
            The pandas dataframe or series.
        """
        super().handle_input(data_type)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)

        # Create a temporary folder
        temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")
        temp_file = os.path.join(str(temp_dir), DEFAULT_FILENAME)

        # Copy from artifact store to temporary file
        fileio.copy(filepath, temp_file)

        # Load the model from the temporary file
        df = pd.read_parquet(temp_file)

        # Cleanup and return
        fileio.rmtree(temp_dir)

        if issubclass(data_type, pd.Series):
            # Taking the first column if its a series as the assumption
            # is that there will only be one
            assert len(df.columns) == 1
            df = df[df.columns[0]]

        return df

    def handle_return(self, df: Union[pd.DataFrame, pd.Series]) -> None:
        """Writes a pandas dataframe or series to the specified filename.

        Args:
            df: The pandas dataframe or series to write.
        """
        super().handle_return(df)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)

        if isinstance(df, pd.Series):
            df = df.to_frame(name="series")

        # Create a temporary file to store the model
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".gzip", delete=False
        ) as f:
            df.to_parquet(f.name, compression=COMPRESSION_TYPE)
            fileio.copy(f.name, filepath)

        # Close and remove the temporary file
        f.close()
        fileio.remove(f.name)
handle_input(self, data_type)

Reads pd.DataFrame or pd.Series from a parquet file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Union[pandas.core.frame.DataFrame, pandas.core.series.Series]

The pandas dataframe or series.

Source code in zenml/materializers/pandas_materializer.py
def handle_input(
    self, data_type: Type[Any]
) -> Union[pd.DataFrame, pd.Series]:
    """Reads pd.DataFrame or pd.Series from a parquet file.

    Args:
        data_type: The type of the data to read.

    Returns:
        The pandas dataframe or series.
    """
    super().handle_input(data_type)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)

    # Create a temporary folder
    temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")
    temp_file = os.path.join(str(temp_dir), DEFAULT_FILENAME)

    # Copy from artifact store to temporary file
    fileio.copy(filepath, temp_file)

    # Load the model from the temporary file
    df = pd.read_parquet(temp_file)

    # Cleanup and return
    fileio.rmtree(temp_dir)

    if issubclass(data_type, pd.Series):
        # Taking the first column if its a series as the assumption
        # is that there will only be one
        assert len(df.columns) == 1
        df = df[df.columns[0]]

    return df
handle_return(self, df)

Writes a pandas dataframe or series to the specified filename.

Parameters:

Name Type Description Default
df Union[pandas.core.frame.DataFrame, pandas.core.series.Series]

The pandas dataframe or series to write.

required
Source code in zenml/materializers/pandas_materializer.py
def handle_return(self, df: Union[pd.DataFrame, pd.Series]) -> None:
    """Writes a pandas dataframe or series to the specified filename.

    Args:
        df: The pandas dataframe or series to write.
    """
    super().handle_return(df)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)

    if isinstance(df, pd.Series):
        df = df.to_frame(name="series")

    # Create a temporary file to store the model
    with tempfile.NamedTemporaryFile(
        mode="w", suffix=".gzip", delete=False
    ) as f:
        df.to_parquet(f.name, compression=COMPRESSION_TYPE)
        fileio.copy(f.name, filepath)

    # Close and remove the temporary file
    f.close()
    fileio.remove(f.name)

service_materializer

Implementation of a materializer to read and write ZenML service instances.

ServiceMaterializer (BaseMaterializer)

Materializer to read/write service instances.

Source code in zenml/materializers/service_materializer.py
class ServiceMaterializer(BaseMaterializer):
    """Materializer to read/write service instances."""

    ASSOCIATED_TYPES = (BaseService,)
    ASSOCIATED_ARTIFACT_TYPES = (ServiceArtifact,)

    def handle_input(self, data_type: Type[Any]) -> BaseService:
        """Creates and returns a service.

        This service is instantiated from the serialized service configuration
        and last known status information saved as artifact.

        Args:
            data_type: The type of the data to read.

        Returns:
            A ZenML service instance.
        """
        super().handle_input(data_type)
        filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
        with fileio.open(filepath, "r") as f:
            service = ServiceRegistry().load_service_from_json(f.read())
        return service

    def handle_return(self, service: BaseService) -> None:
        """Writes a ZenML service.

        The configuration and last known status of the input service instance
        are serialized and saved as an artifact.

        Args:
            service: A ZenML service instance.
        """
        super().handle_return(service)
        filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
        with fileio.open(filepath, "w") as f:
            f.write(service.json(indent=4))
handle_input(self, data_type)

Creates and returns a service.

This service is instantiated from the serialized service configuration and last known status information saved as artifact.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
BaseService

A ZenML service instance.

Source code in zenml/materializers/service_materializer.py
def handle_input(self, data_type: Type[Any]) -> BaseService:
    """Creates and returns a service.

    This service is instantiated from the serialized service configuration
    and last known status information saved as artifact.

    Args:
        data_type: The type of the data to read.

    Returns:
        A ZenML service instance.
    """
    super().handle_input(data_type)
    filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
    with fileio.open(filepath, "r") as f:
        service = ServiceRegistry().load_service_from_json(f.read())
    return service
handle_return(self, service)

Writes a ZenML service.

The configuration and last known status of the input service instance are serialized and saved as an artifact.

Parameters:

Name Type Description Default
service BaseService

A ZenML service instance.

required
Source code in zenml/materializers/service_materializer.py
def handle_return(self, service: BaseService) -> None:
    """Writes a ZenML service.

    The configuration and last known status of the input service instance
    are serialized and saved as an artifact.

    Args:
        service: A ZenML service instance.
    """
    super().handle_return(service)
    filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
    with fileio.open(filepath, "w") as f:
        f.write(service.json(indent=4))