Skip to content

Materializers

zenml.materializers special

Materializers are used to convert a ZenML artifact into a specific format. They are most often used to handle the input or output of ZenML steps, and can be extended by building on the BaseMaterializer class.

base_materializer

BaseMaterializer

Base Materializer to realize artifact data.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializer(metaclass=BaseMaterializerMeta):
    """Base Materializer to realize artifact data."""

    ASSOCIATED_ARTIFACT_TYPES: ClassVar[Tuple[Type["BaseArtifact"], ...]] = ()
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = ()

    def __init__(self, artifact: "BaseArtifact"):
        """Initializes a materializer with the given artifact."""
        self.artifact = artifact

    def _can_handle_type(self, data_type: Type[Any]) -> bool:
        """Whether the materializer can read/write a certain type."""
        return any(
            issubclass(data_type, associated_type)
            for associated_type in self.ASSOCIATED_TYPES
        )

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Write logic here to handle input of the step function.

        Args:
            data_type: What type the input should be materialized as.
        Returns:
            Any object that is to be passed into the relevant artifact in the
            step.
        """
        if not self._can_handle_type(data_type):
            raise TypeError(
                f"Unable to handle type {data_type}. {self.__class__.__name__} "
                f"can only read artifacts to the following types: "
                f"{self.ASSOCIATED_TYPES}."
            )

    def handle_return(self, data: Any) -> None:
        """Write logic here to handle return of the step function.

        Args:
            data: Any object that is specified as an input artifact of the step.
        """
        data_type = type(data)
        if not self._can_handle_type(data_type):
            raise TypeError(
                f"Unable to write {data_type}. {self.__class__.__name__} "
                f"can only write the following types: {self.ASSOCIATED_TYPES}."
            )
__init__(self, artifact) special

Initializes a materializer with the given artifact.

Source code in zenml/materializers/base_materializer.py
def __init__(self, artifact: "BaseArtifact"):
    """Initializes a materializer with the given artifact."""
    self.artifact = artifact
handle_input(self, data_type)

Write logic here to handle input of the step function.

Parameters:

Name Type Description Default
data_type Type[Any]

What type the input should be materialized as.

required

Returns:

Type Description
Any

Any object that is to be passed into the relevant artifact in the step.

Source code in zenml/materializers/base_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Write logic here to handle input of the step function.

    Args:
        data_type: What type the input should be materialized as.
    Returns:
        Any object that is to be passed into the relevant artifact in the
        step.
    """
    if not self._can_handle_type(data_type):
        raise TypeError(
            f"Unable to handle type {data_type}. {self.__class__.__name__} "
            f"can only read artifacts to the following types: "
            f"{self.ASSOCIATED_TYPES}."
        )
handle_return(self, data)

Write logic here to handle return of the step function.

Parameters:

Name Type Description Default
data Any

Any object that is specified as an input artifact of the step.

required
Source code in zenml/materializers/base_materializer.py
def handle_return(self, data: Any) -> None:
    """Write logic here to handle return of the step function.

    Args:
        data: Any object that is specified as an input artifact of the step.
    """
    data_type = type(data)
    if not self._can_handle_type(data_type):
        raise TypeError(
            f"Unable to write {data_type}. {self.__class__.__name__} "
            f"can only write the following types: {self.ASSOCIATED_TYPES}."
        )

BaseMaterializerMeta (type)

Metaclass responsible for registering different BaseMaterializer subclasses for reading/writing artifacts.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializerMeta(type):
    """Metaclass responsible for registering different BaseMaterializer
    subclasses for reading/writing artifacts."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BaseMaterializerMeta":
        """Creates a Materializer class and registers it at
        the `MaterializerRegistry`."""
        cls = cast(
            Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
        )
        if name != "BaseMaterializer":
            from zenml.artifacts.base_artifact import BaseArtifact

            if not cls.ASSOCIATED_TYPES:
                raise MaterializerInterfaceError(
                    f"Invalid materializer class '{name}'. When creating a "
                    f"custom materializer, make sure to specify at least one "
                    f"type in its ASSOCIATED_TYPES class variable.",
                    url="https://docs.zenml.io/guides/index/custom-materializer",
                )

            for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
                if not (
                    inspect.isclass(artifact_type)
                    and issubclass(artifact_type, BaseArtifact)
                ):
                    raise MaterializerInterfaceError(
                        f"Associated artifact type {artifact_type} for "
                        f"materializer {name} is not a `BaseArtifact` "
                        f"subclass.",
                        url="https://docs.zenml.io/guides/index/custom-materializer",
                    )

            artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
            for associated_type in cls.ASSOCIATED_TYPES:
                if not inspect.isclass(associated_type):
                    raise MaterializerInterfaceError(
                        f"Associated type {associated_type} for materializer "
                        f"{name} is not a class.",
                        url="https://docs.zenml.io/guides/index/custom-materializer",
                    )

                default_materializer_registry.register_materializer_type(
                    associated_type, cls
                )

                type_registry.register_integration(
                    associated_type, artifact_types
                )
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Creates a Materializer class and registers it at the MaterializerRegistry.

Source code in zenml/materializers/base_materializer.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
    """Creates a Materializer class and registers it at
    the `MaterializerRegistry`."""
    cls = cast(
        Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
    )
    if name != "BaseMaterializer":
        from zenml.artifacts.base_artifact import BaseArtifact

        if not cls.ASSOCIATED_TYPES:
            raise MaterializerInterfaceError(
                f"Invalid materializer class '{name}'. When creating a "
                f"custom materializer, make sure to specify at least one "
                f"type in its ASSOCIATED_TYPES class variable.",
                url="https://docs.zenml.io/guides/index/custom-materializer",
            )

        for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
            if not (
                inspect.isclass(artifact_type)
                and issubclass(artifact_type, BaseArtifact)
            ):
                raise MaterializerInterfaceError(
                    f"Associated artifact type {artifact_type} for "
                    f"materializer {name} is not a `BaseArtifact` "
                    f"subclass.",
                    url="https://docs.zenml.io/guides/index/custom-materializer",
                )

        artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
        for associated_type in cls.ASSOCIATED_TYPES:
            if not inspect.isclass(associated_type):
                raise MaterializerInterfaceError(
                    f"Associated type {associated_type} for materializer "
                    f"{name} is not a class.",
                    url="https://docs.zenml.io/guides/index/custom-materializer",
                )

            default_materializer_registry.register_materializer_type(
                associated_type, cls
            )

            type_registry.register_integration(
                associated_type, artifact_types
            )
    return cls

beam_materializer

BeamMaterializer (BaseMaterializer)

Materializer to read data to and from beam.

Source code in zenml/materializers/beam_materializer.py
class BeamMaterializer(BaseMaterializer):
    """Materializer to read data to and from beam."""

    ASSOCIATED_TYPES = (beam.PCollection,)
    ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Reads all files inside the artifact directory and materializes them
        as a beam compatible output."""
        # TODO [ENG-138]: Implement beam reading
        super().handle_input(data_type)

    def handle_return(self, pipeline: beam.Pipeline) -> None:
        """Appends a beam.io.WriteToParquet at the end of a beam pipeline
        and therefore persists the results.

        Args:
            pipeline: A beam.pipeline object.
        """
        # TODO [ENG-139]: Implement beam writing
        super().handle_return(pipeline)
        pipeline | beam.ParDo()
        pipeline.run()
        # pipeline | beam.io.WriteToParquet(self.artifact.uri)
        # pipeline.run()
handle_input(self, data_type)

Reads all files inside the artifact directory and materializes them as a beam compatible output.

Source code in zenml/materializers/beam_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Reads all files inside the artifact directory and materializes them
    as a beam compatible output."""
    # TODO [ENG-138]: Implement beam reading
    super().handle_input(data_type)
handle_return(self, pipeline)

Appends a beam.io.WriteToParquet at the end of a beam pipeline and therefore persists the results.

Parameters:

Name Type Description Default
pipeline Pipeline

A beam.pipeline object.

required
Source code in zenml/materializers/beam_materializer.py
def handle_return(self, pipeline: beam.Pipeline) -> None:
    """Appends a beam.io.WriteToParquet at the end of a beam pipeline
    and therefore persists the results.

    Args:
        pipeline: A beam.pipeline object.
    """
    # TODO [ENG-139]: Implement beam writing
    super().handle_return(pipeline)
    pipeline | beam.ParDo()
    pipeline.run()
    # pipeline | beam.io.WriteToParquet(self.artifact.uri)
    # pipeline.run()

built_in_materializer

BuiltInMaterializer (BaseMaterializer)

Read/Write JSON files.

Source code in zenml/materializers/built_in_materializer.py
class BuiltInMaterializer(BaseMaterializer):
    """Read/Write JSON files."""

    # TODO [ENG-322]: consider adding typing.Dict and typing.List
    # since these are the 'correct' way to annotate these types.

    ASSOCIATED_ARTIFACT_TYPES = (
        DataArtifact,
        DataAnalysisArtifact,
    )
    ASSOCIATED_TYPES = (
        int,
        str,
        bytes,
        dict,
        float,
        list,
        tuple,
        bool,
    )

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Reads basic primitive types from json."""
        super().handle_input(data_type)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        contents = yaml_utils.read_json(filepath)
        if type(contents) != data_type:
            # TODO [ENG-142]: Raise error or try to coerce
            logger.debug(
                f"Contents {contents} was type {type(contents)} but expected "
                f"{data_type}"
            )
        return contents

    def handle_return(self, data: Any) -> None:
        """Handles basic built-in types and stores them as json"""
        super().handle_return(data)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        yaml_utils.write_json(filepath, data)
handle_input(self, data_type)

Reads basic primitive types from json.

Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Reads basic primitive types from json."""
    super().handle_input(data_type)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    contents = yaml_utils.read_json(filepath)
    if type(contents) != data_type:
        # TODO [ENG-142]: Raise error or try to coerce
        logger.debug(
            f"Contents {contents} was type {type(contents)} but expected "
            f"{data_type}"
        )
    return contents
handle_return(self, data)

Handles basic built-in types and stores them as json

Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
    """Handles basic built-in types and stores them as json"""
    super().handle_return(data)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    yaml_utils.write_json(filepath, data)

default_materializer_registry

MaterializerRegistry

Matches a python type to a default materializer.

Source code in zenml/materializers/default_materializer_registry.py
class MaterializerRegistry:
    """Matches a python type to a default materializer."""

    def __init__(self) -> None:
        self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}

    def register_materializer_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer.

        Args:
            key: Indicates the type of object.
            type_: A BaseMaterializer subclass.
        """
        if key not in self.materializer_types:
            self.materializer_types[key] = type_
            logger.debug(f"Registered materializer {type_} for {key}")
        else:
            logger.debug(
                f"Found existing materializer class for {key}: "
                f"{self.materializer_types[key]}. Skipping registration of "
                f"{type_}."
            )

    def register_and_overwrite_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer and also overwrites a default if set.

        Args:
            key: Indicates the type of object.
            type_: A BaseMaterializer subclass.
        """
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")

    def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
        """Get a single materializers based on the key.

        Args:
            key: Indicates the type of object.

        Returns:
            `BaseMaterializer` subclass that was registered for this key.

        Raises:
            StepInterfaceError: If the key (or any of its superclasses) is not
                registered or the key has more than one superclass with
                different default materializers
        """
        # Check whether the type is registered
        if key in self.materializer_types:
            return self.materializer_types[key]
        else:
            # If the type is not registered, check for superclasses
            materializers_for_compatible_superclasses = {
                materializer
                for registered_type, materializer in self.materializer_types.items()
                if issubclass(key, registered_type)
            }
            # Make sure that there is only a single materializer
            if len(materializers_for_compatible_superclasses) == 1:
                return materializers_for_compatible_superclasses.pop()
            elif len(materializers_for_compatible_superclasses) > 1:
                raise StepInterfaceError(
                    f"Type {key} is subclassing more than one type, thus it "
                    f"maps to multiple materializers within the materializer "
                    f"registry: {materializers_for_compatible_superclasses}. "
                    f"Please specify which of these materializers you would "
                    f"like to use explicitly in your step.",
                    url="https://docs.zenml.io/guides/index/custom-materializer",
                )
        raise StepInterfaceError(
            f"No materializer registered for class {key}. You can register a "
            f"default materializer for specific types by subclassing "
            f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
            f" variable.",
            url="https://docs.zenml.io/guides/index/custom-materializer",
        )

    def get_materializer_types(
        self,
    ) -> Dict[Type[Any], Type["BaseMaterializer"]]:
        """Get all registered materializer types."""
        return self.materializer_types

    def is_registered(self, key: Type[Any]) -> bool:
        """Returns if a materializer class is registered for the given type."""
        return any(issubclass(key, t) for t in self.materializer_types)
__getitem__(self, key) special

Get a single materializers based on the key.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required

Returns:

Type Description
Type[BaseMaterializer]

BaseMaterializer subclass that was registered for this key.

Exceptions:

Type Description
StepInterfaceError

If the key (or any of its superclasses) is not registered or the key has more than one superclass with different default materializers

Source code in zenml/materializers/default_materializer_registry.py
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
    """Get a single materializers based on the key.

    Args:
        key: Indicates the type of object.

    Returns:
        `BaseMaterializer` subclass that was registered for this key.

    Raises:
        StepInterfaceError: If the key (or any of its superclasses) is not
            registered or the key has more than one superclass with
            different default materializers
    """
    # Check whether the type is registered
    if key in self.materializer_types:
        return self.materializer_types[key]
    else:
        # If the type is not registered, check for superclasses
        materializers_for_compatible_superclasses = {
            materializer
            for registered_type, materializer in self.materializer_types.items()
            if issubclass(key, registered_type)
        }
        # Make sure that there is only a single materializer
        if len(materializers_for_compatible_superclasses) == 1:
            return materializers_for_compatible_superclasses.pop()
        elif len(materializers_for_compatible_superclasses) > 1:
            raise StepInterfaceError(
                f"Type {key} is subclassing more than one type, thus it "
                f"maps to multiple materializers within the materializer "
                f"registry: {materializers_for_compatible_superclasses}. "
                f"Please specify which of these materializers you would "
                f"like to use explicitly in your step.",
                url="https://docs.zenml.io/guides/index/custom-materializer",
            )
    raise StepInterfaceError(
        f"No materializer registered for class {key}. You can register a "
        f"default materializer for specific types by subclassing "
        f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
        f" variable.",
        url="https://docs.zenml.io/guides/index/custom-materializer",
    )
get_materializer_types(self)

Get all registered materializer types.

Source code in zenml/materializers/default_materializer_registry.py
def get_materializer_types(
    self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
    """Get all registered materializer types."""
    return self.materializer_types
is_registered(self, key)

Returns if a materializer class is registered for the given type.

Source code in zenml/materializers/default_materializer_registry.py
def is_registered(self, key: Type[Any]) -> bool:
    """Returns if a materializer class is registered for the given type."""
    return any(issubclass(key, t) for t in self.materializer_types)
register_and_overwrite_type(self, key, type_)

Registers a new materializer and also overwrites a default if set.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_and_overwrite_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer and also overwrites a default if set.

    Args:
        key: Indicates the type of object.
        type_: A BaseMaterializer subclass.
    """
    self.materializer_types[key] = type_
    logger.debug(f"Registered materializer {type_} for {key}")
register_materializer_type(self, key, type_)

Registers a new materializer.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_materializer_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer.

    Args:
        key: Indicates the type of object.
        type_: A BaseMaterializer subclass.
    """
    if key not in self.materializer_types:
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")
    else:
        logger.debug(
            f"Found existing materializer class for {key}: "
            f"{self.materializer_types[key]}. Skipping registration of "
            f"{type_}."
        )

numpy_materializer

NumpyMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/numpy_materializer.py
class NumpyMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (np.ndarray,)
    ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)

    def handle_input(self, data_type: Type[Any]) -> np.ndarray:
        """Reads numpy array from parquet file."""
        super().handle_input(data_type)
        shape_dict = yaml_utils.read_json(
            os.path.join(self.artifact.uri, SHAPE_FILENAME)
        )
        shape_tuple = tuple(shape_dict.values())
        with fileio.open(
            os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
        ) as f:
            input_stream = pa.input_stream(f)
            data = pq.read_table(input_stream)
        vals = getattr(data.to_pandas(), DATA_VAR).values
        return np.reshape(vals, shape_tuple)

    def handle_return(self, arr: np.ndarray) -> None:
        """Writes a np.ndarray to the artifact store as a parquet file.

        Args:
            arr: The numpy array to write.
        """
        super().handle_return(arr)
        yaml_utils.write_json(
            os.path.join(self.artifact.uri, SHAPE_FILENAME),
            {str(i): x for i, x in enumerate(arr.shape)},
        )
        pa_table = pa.table({DATA_VAR: arr.flatten()})
        with fileio.open(
            os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
        ) as f:
            stream = pa.output_stream(f)
            pq.write_table(pa_table, stream)
handle_input(self, data_type)

Reads numpy array from parquet file.

Source code in zenml/materializers/numpy_materializer.py
def handle_input(self, data_type: Type[Any]) -> np.ndarray:
    """Reads numpy array from parquet file."""
    super().handle_input(data_type)
    shape_dict = yaml_utils.read_json(
        os.path.join(self.artifact.uri, SHAPE_FILENAME)
    )
    shape_tuple = tuple(shape_dict.values())
    with fileio.open(
        os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
    ) as f:
        input_stream = pa.input_stream(f)
        data = pq.read_table(input_stream)
    vals = getattr(data.to_pandas(), DATA_VAR).values
    return np.reshape(vals, shape_tuple)
handle_return(self, arr)

Writes a np.ndarray to the artifact store as a parquet file.

Parameters:

Name Type Description Default
arr ndarray

The numpy array to write.

required
Source code in zenml/materializers/numpy_materializer.py
def handle_return(self, arr: np.ndarray) -> None:
    """Writes a np.ndarray to the artifact store as a parquet file.

    Args:
        arr: The numpy array to write.
    """
    super().handle_return(arr)
    yaml_utils.write_json(
        os.path.join(self.artifact.uri, SHAPE_FILENAME),
        {str(i): x for i, x in enumerate(arr.shape)},
    )
    pa_table = pa.table({DATA_VAR: arr.flatten()})
    with fileio.open(
        os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
    ) as f:
        stream = pa.output_stream(f)
        pq.write_table(pa_table, stream)

pandas_materializer

PandasMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (pd.DataFrame,)
    ASSOCIATED_ARTIFACT_TYPES = (
        DataArtifact,
        StatisticsArtifact,
        SchemaArtifact,
    )

    def handle_input(self, data_type: Type[Any]) -> pd.DataFrame:
        """Reads pd.Dataframe from a parquet file."""
        super().handle_input(data_type)
        return pd.read_parquet(
            os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        )

    def handle_return(self, df: pd.DataFrame) -> None:
        """Writes a pandas dataframe to the specified filename.

        Args:
            df: The pandas dataframe to write.
        """
        super().handle_return(df)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        df.to_parquet(filepath, compression=COMPRESSION_TYPE)
handle_input(self, data_type)

Reads pd.Dataframe from a parquet file.

Source code in zenml/materializers/pandas_materializer.py
def handle_input(self, data_type: Type[Any]) -> pd.DataFrame:
    """Reads pd.Dataframe from a parquet file."""
    super().handle_input(data_type)
    return pd.read_parquet(
        os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    )
handle_return(self, df)

Writes a pandas dataframe to the specified filename.

Parameters:

Name Type Description Default
df DataFrame

The pandas dataframe to write.

required
Source code in zenml/materializers/pandas_materializer.py
def handle_return(self, df: pd.DataFrame) -> None:
    """Writes a pandas dataframe to the specified filename.

    Args:
        df: The pandas dataframe to write.
    """
    super().handle_return(df)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    df.to_parquet(filepath, compression=COMPRESSION_TYPE)