Skip to content

Materializers

zenml.materializers special

Initialization of ZenML materializers.

Materializers are used to convert a ZenML artifact into a specific format. They are most often used to handle the input or output of ZenML steps, and can be extended by building on the BaseMaterializer class.

base_materializer

Metaclass implementation for registering ZenML BaseMaterializer subclasses.

BaseMaterializer

Base Materializer to realize artifact data.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializer(metaclass=BaseMaterializerMeta):
    """Base Materializer to realize artifact data."""

    ASSOCIATED_ARTIFACT_TYPES: ClassVar[Tuple[Type["BaseArtifact"], ...]] = ()
    ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = ()

    def __init__(self, artifact: "BaseArtifact"):
        """Initializes a materializer with the given artifact.

        Args:
            artifact: The artifact to materialize.
        """
        self.artifact = artifact

    def _can_handle_type(self, data_type: Type[Any]) -> bool:
        """Whether the materializer can read/write a certain type.

        Args:
            data_type: The type to check.

        Returns:
            Whether the materializer can read/write the given type.
        """
        return any(
            issubclass(data_type, associated_type)
            for associated_type in self.ASSOCIATED_TYPES
        )

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Write logic here to handle input of the step function.

        Args:
            data_type: What type the input should be materialized as.

        Raises:
            TypeError: If the data is not of the correct type.
        """
        if not self._can_handle_type(data_type):
            raise TypeError(
                f"Unable to handle type {data_type}. {self.__class__.__name__} "
                f"can only read artifacts to the following types: "
                f"{self.ASSOCIATED_TYPES}."
            )

    def handle_return(self, data: Any) -> None:
        """Write logic here to handle return of the step function.

        Args:
            data: Any object that is specified as an input artifact of the step.

        Raises:
            TypeError: If the data is not of the correct type.
        """
        data_type = type(data)
        if not self._can_handle_type(data_type):
            raise TypeError(
                f"Unable to write {data_type}. {self.__class__.__name__} "
                f"can only write the following types: {self.ASSOCIATED_TYPES}."
            )
__init__(self, artifact) special

Initializes a materializer with the given artifact.

Parameters:

Name Type Description Default
artifact BaseArtifact

The artifact to materialize.

required
Source code in zenml/materializers/base_materializer.py
def __init__(self, artifact: "BaseArtifact"):
    """Initializes a materializer with the given artifact.

    Args:
        artifact: The artifact to materialize.
    """
    self.artifact = artifact
handle_input(self, data_type)

Write logic here to handle input of the step function.

Parameters:

Name Type Description Default
data_type Type[Any]

What type the input should be materialized as.

required

Exceptions:

Type Description
TypeError

If the data is not of the correct type.

Source code in zenml/materializers/base_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Write logic here to handle input of the step function.

    Args:
        data_type: What type the input should be materialized as.

    Raises:
        TypeError: If the data is not of the correct type.
    """
    if not self._can_handle_type(data_type):
        raise TypeError(
            f"Unable to handle type {data_type}. {self.__class__.__name__} "
            f"can only read artifacts to the following types: "
            f"{self.ASSOCIATED_TYPES}."
        )
handle_return(self, data)

Write logic here to handle return of the step function.

Parameters:

Name Type Description Default
data Any

Any object that is specified as an input artifact of the step.

required

Exceptions:

Type Description
TypeError

If the data is not of the correct type.

Source code in zenml/materializers/base_materializer.py
def handle_return(self, data: Any) -> None:
    """Write logic here to handle return of the step function.

    Args:
        data: Any object that is specified as an input artifact of the step.

    Raises:
        TypeError: If the data is not of the correct type.
    """
    data_type = type(data)
    if not self._can_handle_type(data_type):
        raise TypeError(
            f"Unable to write {data_type}. {self.__class__.__name__} "
            f"can only write the following types: {self.ASSOCIATED_TYPES}."
        )

BaseMaterializerMeta (type)

Metaclass responsible for registering different BaseMaterializer subclasses.

Materializers are used for reading/writing artifacts.

Source code in zenml/materializers/base_materializer.py
class BaseMaterializerMeta(type):
    """Metaclass responsible for registering different BaseMaterializer subclasses.

    Materializers are used for reading/writing artifacts.
    """

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BaseMaterializerMeta":
        """Creates a Materializer class and registers it at the `MaterializerRegistry`.

        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The dictionary of the class.

        Returns:
            The BaseMaterializerMeta class.

        Raises:
            MaterializerInterfaceError: If the class was improperly defined.
        """
        cls = cast(
            Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
        )
        if name != "BaseMaterializer":
            from zenml.artifacts.base_artifact import BaseArtifact

            if not cls.ASSOCIATED_TYPES:
                raise MaterializerInterfaceError(
                    f"Invalid materializer class '{name}'. When creating a "
                    f"custom materializer, make sure to specify at least one "
                    f"type in its ASSOCIATED_TYPES class variable.",
                    url="https://docs.zenml.io/developer-guide/materializer",
                )

            for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
                if not (
                    inspect.isclass(artifact_type)
                    and issubclass(artifact_type, BaseArtifact)
                ):
                    raise MaterializerInterfaceError(
                        f"Associated artifact type {artifact_type} for "
                        f"materializer {name} is not a `BaseArtifact` "
                        f"subclass.",
                        url="https://docs.zenml.io/developer-guide/materializer",
                    )

            artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
            for associated_type in cls.ASSOCIATED_TYPES:
                if not inspect.isclass(associated_type):
                    raise MaterializerInterfaceError(
                        f"Associated type {associated_type} for materializer "
                        f"{name} is not a class.",
                        url="https://docs.zenml.io/developer-guide/materializer",
                    )

                default_materializer_registry.register_materializer_type(
                    associated_type, cls
                )

                type_registry.register_integration(
                    associated_type, artifact_types
                )
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Creates a Materializer class and registers it at the MaterializerRegistry.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases Tuple[Type[Any], ...]

The base classes of the class.

required
dct Dict[str, Any]

The dictionary of the class.

required

Returns:

Type Description
BaseMaterializerMeta

The BaseMaterializerMeta class.

Exceptions:

Type Description
MaterializerInterfaceError

If the class was improperly defined.

Source code in zenml/materializers/base_materializer.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
    """Creates a Materializer class and registers it at the `MaterializerRegistry`.

    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The dictionary of the class.

    Returns:
        The BaseMaterializerMeta class.

    Raises:
        MaterializerInterfaceError: If the class was improperly defined.
    """
    cls = cast(
        Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
    )
    if name != "BaseMaterializer":
        from zenml.artifacts.base_artifact import BaseArtifact

        if not cls.ASSOCIATED_TYPES:
            raise MaterializerInterfaceError(
                f"Invalid materializer class '{name}'. When creating a "
                f"custom materializer, make sure to specify at least one "
                f"type in its ASSOCIATED_TYPES class variable.",
                url="https://docs.zenml.io/developer-guide/materializer",
            )

        for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
            if not (
                inspect.isclass(artifact_type)
                and issubclass(artifact_type, BaseArtifact)
            ):
                raise MaterializerInterfaceError(
                    f"Associated artifact type {artifact_type} for "
                    f"materializer {name} is not a `BaseArtifact` "
                    f"subclass.",
                    url="https://docs.zenml.io/developer-guide/materializer",
                )

        artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
        for associated_type in cls.ASSOCIATED_TYPES:
            if not inspect.isclass(associated_type):
                raise MaterializerInterfaceError(
                    f"Associated type {associated_type} for materializer "
                    f"{name} is not a class.",
                    url="https://docs.zenml.io/developer-guide/materializer",
                )

            default_materializer_registry.register_materializer_type(
                associated_type, cls
            )

            type_registry.register_integration(
                associated_type, artifact_types
            )
    return cls

beam_materializer

Implementation of the ZenML Apache Beam materializer.

BeamMaterializer (BaseMaterializer)

Materializer to read data to and from beam.

Source code in zenml/materializers/beam_materializer.py
class BeamMaterializer(BaseMaterializer):
    """Materializer to read data to and from beam."""

    ASSOCIATED_TYPES = (beam.PCollection,)
    ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Reads all files inside the artifact directory.

        It materializes them as a beam compatible output.

        Args:
            data_type: The type of the data to read.
        """
        # TODO [ENG-138]: Implement beam reading
        super().handle_input(data_type)

    def handle_return(self, pipeline: beam.Pipeline) -> None:
        """Appends a beam.io.WriteToParquet at the end of a beam pipeline.

        This therefore persists the results.

        Args:
            pipeline: A beam.pipeline object.
        """
        # TODO [ENG-139]: Implement beam writing
        super().handle_return(pipeline)
        pipeline | beam.ParDo()
        pipeline.run()
        # pipeline | beam.io.WriteToParquet(self.artifact.uri)
        # pipeline.run()
handle_input(self, data_type)

Reads all files inside the artifact directory.

It materializes them as a beam compatible output.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required
Source code in zenml/materializers/beam_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Reads all files inside the artifact directory.

    It materializes them as a beam compatible output.

    Args:
        data_type: The type of the data to read.
    """
    # TODO [ENG-138]: Implement beam reading
    super().handle_input(data_type)
handle_return(self, pipeline)

Appends a beam.io.WriteToParquet at the end of a beam pipeline.

This therefore persists the results.

Parameters:

Name Type Description Default
pipeline Pipeline

A beam.pipeline object.

required
Source code in zenml/materializers/beam_materializer.py
def handle_return(self, pipeline: beam.Pipeline) -> None:
    """Appends a beam.io.WriteToParquet at the end of a beam pipeline.

    This therefore persists the results.

    Args:
        pipeline: A beam.pipeline object.
    """
    # TODO [ENG-139]: Implement beam writing
    super().handle_return(pipeline)
    pipeline | beam.ParDo()
    pipeline.run()
    # pipeline | beam.io.WriteToParquet(self.artifact.uri)
    # pipeline.run()

built_in_materializer

Implementation of ZenML's builtin materializer.

BuiltInMaterializer (BaseMaterializer)

Read/Write JSON files.

Source code in zenml/materializers/built_in_materializer.py
class BuiltInMaterializer(BaseMaterializer):
    """Read/Write JSON files."""

    # since these are the 'correct' way to annotate these types.

    ASSOCIATED_ARTIFACT_TYPES = (
        DataArtifact,
        DataAnalysisArtifact,
    )
    ASSOCIATED_TYPES = (
        int,
        str,
        bytes,
        dict,
        float,
        list,
        tuple,
        bool,
    )

    def handle_input(self, data_type: Type[Any]) -> Any:
        """Reads basic primitive types from json.

        Args:
            data_type: The type of the data to read.

        Returns:
            The data read.
        """
        super().handle_input(data_type)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        contents = yaml_utils.read_json(filepath)
        if type(contents) != data_type:
            # TODO [ENG-142]: Raise error or try to coerce
            logger.debug(
                f"Contents {contents} was type {type(contents)} but expected "
                f"{data_type}"
            )
        return contents

    def handle_return(self, data: Any) -> None:
        """Handles basic built-in types and stores them as json.

        Args:
            data: The data to store.
        """
        super().handle_return(data)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
        yaml_utils.write_json(filepath, data)
handle_input(self, data_type)

Reads basic primitive types from json.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Any

The data read.

Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
    """Reads basic primitive types from json.

    Args:
        data_type: The type of the data to read.

    Returns:
        The data read.
    """
    super().handle_input(data_type)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    contents = yaml_utils.read_json(filepath)
    if type(contents) != data_type:
        # TODO [ENG-142]: Raise error or try to coerce
        logger.debug(
            f"Contents {contents} was type {type(contents)} but expected "
            f"{data_type}"
        )
    return contents
handle_return(self, data)

Handles basic built-in types and stores them as json.

Parameters:

Name Type Description Default
data Any

The data to store.

required
Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
    """Handles basic built-in types and stores them as json.

    Args:
        data: The data to store.
    """
    super().handle_return(data)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
    yaml_utils.write_json(filepath, data)

default_materializer_registry

Implementation of a default materializer registry.

MaterializerRegistry

Matches a Python type to a default materializer.

Source code in zenml/materializers/default_materializer_registry.py
class MaterializerRegistry:
    """Matches a Python type to a default materializer."""

    def __init__(self) -> None:
        """Initialize the materializer registry."""
        self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}

    def register_materializer_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer.

        Args:
            key: Indicates the type of object.
            type_: A BaseMaterializer subclass.
        """
        if key not in self.materializer_types:
            self.materializer_types[key] = type_
            logger.debug(f"Registered materializer {type_} for {key}")
        else:
            logger.debug(
                f"Found existing materializer class for {key}: "
                f"{self.materializer_types[key]}. Skipping registration of "
                f"{type_}."
            )

    def register_and_overwrite_type(
        self, key: Type[Any], type_: Type["BaseMaterializer"]
    ) -> None:
        """Registers a new materializer and also overwrites a default if set.

        Args:
            key: Indicates the type of object.
            type_: A BaseMaterializer subclass.
        """
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")

    def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
        """Get a single materializers based on the key.

        Args:
            key: Indicates the type of object.

        Returns:
            `BaseMaterializer` subclass that was registered for this key.

        Raises:
            StepInterfaceError: If the key (or any of its superclasses) is not
                registered or the key has more than one superclass with
                different default materializers
        """
        # Check whether the type is registered
        if key in self.materializer_types:
            return self.materializer_types[key]
        else:
            # If the type is not registered, check for superclasses
            materializers_for_compatible_superclasses = {
                materializer
                for registered_type, materializer in self.materializer_types.items()
                if issubclass(key, registered_type)
            }
            # Make sure that there is only a single materializer
            if len(materializers_for_compatible_superclasses) == 1:
                return materializers_for_compatible_superclasses.pop()
            elif len(materializers_for_compatible_superclasses) > 1:
                raise StepInterfaceError(
                    f"Type {key} is subclassing more than one type, thus it "
                    f"maps to multiple materializers within the materializer "
                    f"registry: {materializers_for_compatible_superclasses}. "
                    f"Please specify which of these materializers you would "
                    f"like to use explicitly in your step.",
                    url="https://docs.zenml.io/developer-guide/materializer",
                )
        raise StepInterfaceError(
            f"No materializer registered for class {key}. You can register a "
            f"default materializer for specific types by subclassing "
            f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
            f" variable.",
            url="https://docs.zenml.io/developer-guide/materializer",
        )

    def get_materializer_types(
        self,
    ) -> Dict[Type[Any], Type["BaseMaterializer"]]:
        """Get all registered materializer types.

        Returns:
            A dictionary of registered materializer types.
        """
        return self.materializer_types

    def is_registered(self, key: Type[Any]) -> bool:
        """Returns if a materializer class is registered for the given type.

        Args:
            key: Indicates the type of object.

        Returns:
            True if a materializer is registered for the given type, False
            otherwise.
        """
        return any(issubclass(key, t) for t in self.materializer_types)
__getitem__(self, key) special

Get a single materializers based on the key.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required

Returns:

Type Description
Type[BaseMaterializer]

BaseMaterializer subclass that was registered for this key.

Exceptions:

Type Description
StepInterfaceError

If the key (or any of its superclasses) is not registered or the key has more than one superclass with different default materializers

Source code in zenml/materializers/default_materializer_registry.py
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
    """Get a single materializers based on the key.

    Args:
        key: Indicates the type of object.

    Returns:
        `BaseMaterializer` subclass that was registered for this key.

    Raises:
        StepInterfaceError: If the key (or any of its superclasses) is not
            registered or the key has more than one superclass with
            different default materializers
    """
    # Check whether the type is registered
    if key in self.materializer_types:
        return self.materializer_types[key]
    else:
        # If the type is not registered, check for superclasses
        materializers_for_compatible_superclasses = {
            materializer
            for registered_type, materializer in self.materializer_types.items()
            if issubclass(key, registered_type)
        }
        # Make sure that there is only a single materializer
        if len(materializers_for_compatible_superclasses) == 1:
            return materializers_for_compatible_superclasses.pop()
        elif len(materializers_for_compatible_superclasses) > 1:
            raise StepInterfaceError(
                f"Type {key} is subclassing more than one type, thus it "
                f"maps to multiple materializers within the materializer "
                f"registry: {materializers_for_compatible_superclasses}. "
                f"Please specify which of these materializers you would "
                f"like to use explicitly in your step.",
                url="https://docs.zenml.io/developer-guide/materializer",
            )
    raise StepInterfaceError(
        f"No materializer registered for class {key}. You can register a "
        f"default materializer for specific types by subclassing "
        f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
        f" variable.",
        url="https://docs.zenml.io/developer-guide/materializer",
    )
__init__(self) special

Initialize the materializer registry.

Source code in zenml/materializers/default_materializer_registry.py
def __init__(self) -> None:
    """Initialize the materializer registry."""
    self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}
get_materializer_types(self)

Get all registered materializer types.

Returns:

Type Description
Dict[Type[Any], Type[BaseMaterializer]]

A dictionary of registered materializer types.

Source code in zenml/materializers/default_materializer_registry.py
def get_materializer_types(
    self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
    """Get all registered materializer types.

    Returns:
        A dictionary of registered materializer types.
    """
    return self.materializer_types
is_registered(self, key)

Returns if a materializer class is registered for the given type.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required

Returns:

Type Description
bool

True if a materializer is registered for the given type, False otherwise.

Source code in zenml/materializers/default_materializer_registry.py
def is_registered(self, key: Type[Any]) -> bool:
    """Returns if a materializer class is registered for the given type.

    Args:
        key: Indicates the type of object.

    Returns:
        True if a materializer is registered for the given type, False
        otherwise.
    """
    return any(issubclass(key, t) for t in self.materializer_types)
register_and_overwrite_type(self, key, type_)

Registers a new materializer and also overwrites a default if set.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_and_overwrite_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer and also overwrites a default if set.

    Args:
        key: Indicates the type of object.
        type_: A BaseMaterializer subclass.
    """
    self.materializer_types[key] = type_
    logger.debug(f"Registered materializer {type_} for {key}")
register_materializer_type(self, key, type_)

Registers a new materializer.

Parameters:

Name Type Description Default
key Type[Any]

Indicates the type of object.

required
type_ Type[BaseMaterializer]

A BaseMaterializer subclass.

required
Source code in zenml/materializers/default_materializer_registry.py
def register_materializer_type(
    self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
    """Registers a new materializer.

    Args:
        key: Indicates the type of object.
        type_: A BaseMaterializer subclass.
    """
    if key not in self.materializer_types:
        self.materializer_types[key] = type_
        logger.debug(f"Registered materializer {type_} for {key}")
    else:
        logger.debug(
            f"Found existing materializer class for {key}: "
            f"{self.materializer_types[key]}. Skipping registration of "
            f"{type_}."
        )

numpy_materializer

Implementation of the ZenML NumPy materializer.

NumpyMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/numpy_materializer.py
class NumpyMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (np.ndarray,)
    ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)

    def handle_input(self, data_type: Type[Any]) -> "NDArray[Any]":
        """Reads numpy array from parquet file.

        Args:
            data_type: The type of the data to read.

        Returns:
            The numpy array.
        """
        super().handle_input(data_type)
        shape_dict = yaml_utils.read_json(
            os.path.join(self.artifact.uri, SHAPE_FILENAME)
        )
        shape_tuple = tuple(shape_dict.values())
        with fileio.open(
            os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
        ) as f:
            input_stream = pa.input_stream(f)
            data = pq.read_table(input_stream)
        vals = getattr(data.to_pandas(), DATA_VAR).values
        return np.reshape(vals, shape_tuple)

    def handle_return(self, arr: "NDArray[Any]") -> None:
        """Writes a np.ndarray to the artifact store as a parquet file.

        Args:
            arr: The numpy array to write.
        """
        super().handle_return(arr)
        yaml_utils.write_json(
            os.path.join(self.artifact.uri, SHAPE_FILENAME),
            {str(i): x for i, x in enumerate(arr.shape)},
        )
        pa_table = pa.table({DATA_VAR: arr.flatten()})
        with fileio.open(
            os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
        ) as f:
            stream = pa.output_stream(f)
            pq.write_table(pa_table, stream)
handle_input(self, data_type)

Reads numpy array from parquet file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
NDArray[Any]

The numpy array.

Source code in zenml/materializers/numpy_materializer.py
def handle_input(self, data_type: Type[Any]) -> "NDArray[Any]":
    """Reads numpy array from parquet file.

    Args:
        data_type: The type of the data to read.

    Returns:
        The numpy array.
    """
    super().handle_input(data_type)
    shape_dict = yaml_utils.read_json(
        os.path.join(self.artifact.uri, SHAPE_FILENAME)
    )
    shape_tuple = tuple(shape_dict.values())
    with fileio.open(
        os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
    ) as f:
        input_stream = pa.input_stream(f)
        data = pq.read_table(input_stream)
    vals = getattr(data.to_pandas(), DATA_VAR).values
    return np.reshape(vals, shape_tuple)
handle_return(self, arr)

Writes a np.ndarray to the artifact store as a parquet file.

Parameters:

Name Type Description Default
arr NDArray[Any]

The numpy array to write.

required
Source code in zenml/materializers/numpy_materializer.py
def handle_return(self, arr: "NDArray[Any]") -> None:
    """Writes a np.ndarray to the artifact store as a parquet file.

    Args:
        arr: The numpy array to write.
    """
    super().handle_return(arr)
    yaml_utils.write_json(
        os.path.join(self.artifact.uri, SHAPE_FILENAME),
        {str(i): x for i, x in enumerate(arr.shape)},
    )
    pa_table = pa.table({DATA_VAR: arr.flatten()})
    with fileio.open(
        os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
    ) as f:
        stream = pa.output_stream(f)
        pq.write_table(pa_table, stream)

pandas_materializer

Materializer for Pandas.

PandasMaterializer (BaseMaterializer)

Materializer to read data to and from pandas.

Source code in zenml/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
    """Materializer to read data to and from pandas."""

    ASSOCIATED_TYPES = (pd.DataFrame, pd.Series)
    ASSOCIATED_ARTIFACT_TYPES = (
        DataArtifact,
        StatisticsArtifact,
        SchemaArtifact,
    )

    def handle_input(
        self, data_type: Type[Any]
    ) -> Union[pd.DataFrame, pd.Series]:
        """Reads pd.DataFrame or pd.Series from a parquet file.

        Args:
            data_type: The type of the data to read.

        Returns:
            The pandas dataframe or series.
        """
        super().handle_input(data_type)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)

        # Create a temporary folder
        temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")
        temp_file = os.path.join(str(temp_dir), DEFAULT_FILENAME)

        # Copy from artifact store to temporary file
        fileio.copy(filepath, temp_file)

        # Load the model from the temporary file
        df = pd.read_parquet(temp_file)

        # Cleanup and return
        fileio.rmtree(temp_dir)

        if issubclass(data_type, pd.Series):
            # Taking the first column if its a series as the assumption
            # is that there will only be one
            assert len(df.columns) == 1
            df = df[df.columns[0]]

        return df

    def handle_return(self, df: Union[pd.DataFrame, pd.Series]) -> None:
        """Writes a pandas dataframe or series to the specified filename.

        Args:
            df: The pandas dataframe or series to write.
        """
        super().handle_return(df)
        filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)

        if isinstance(df, pd.Series):
            df = df.to_frame(name="series")

        # Create a temporary file to store the model
        with tempfile.NamedTemporaryFile(
            mode="w", suffix=".gzip", delete=False
        ) as f:
            df.to_parquet(f.name, compression=COMPRESSION_TYPE)
            fileio.copy(f.name, filepath)

        # Close and remove the temporary file
        f.close()
        fileio.remove(f.name)
handle_input(self, data_type)

Reads pd.DataFrame or pd.Series from a parquet file.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
Union[pandas.core.frame.DataFrame, pandas.core.series.Series]

The pandas dataframe or series.

Source code in zenml/materializers/pandas_materializer.py
def handle_input(
    self, data_type: Type[Any]
) -> Union[pd.DataFrame, pd.Series]:
    """Reads pd.DataFrame or pd.Series from a parquet file.

    Args:
        data_type: The type of the data to read.

    Returns:
        The pandas dataframe or series.
    """
    super().handle_input(data_type)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)

    # Create a temporary folder
    temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")
    temp_file = os.path.join(str(temp_dir), DEFAULT_FILENAME)

    # Copy from artifact store to temporary file
    fileio.copy(filepath, temp_file)

    # Load the model from the temporary file
    df = pd.read_parquet(temp_file)

    # Cleanup and return
    fileio.rmtree(temp_dir)

    if issubclass(data_type, pd.Series):
        # Taking the first column if its a series as the assumption
        # is that there will only be one
        assert len(df.columns) == 1
        df = df[df.columns[0]]

    return df
handle_return(self, df)

Writes a pandas dataframe or series to the specified filename.

Parameters:

Name Type Description Default
df Union[pandas.core.frame.DataFrame, pandas.core.series.Series]

The pandas dataframe or series to write.

required
Source code in zenml/materializers/pandas_materializer.py
def handle_return(self, df: Union[pd.DataFrame, pd.Series]) -> None:
    """Writes a pandas dataframe or series to the specified filename.

    Args:
        df: The pandas dataframe or series to write.
    """
    super().handle_return(df)
    filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)

    if isinstance(df, pd.Series):
        df = df.to_frame(name="series")

    # Create a temporary file to store the model
    with tempfile.NamedTemporaryFile(
        mode="w", suffix=".gzip", delete=False
    ) as f:
        df.to_parquet(f.name, compression=COMPRESSION_TYPE)
        fileio.copy(f.name, filepath)

    # Close and remove the temporary file
    f.close()
    fileio.remove(f.name)

service_materializer

Implementation of a materializer to read and write ZenML service instances.

ServiceMaterializer (BaseMaterializer)

Materializer to read/write service instances.

Source code in zenml/materializers/service_materializer.py
class ServiceMaterializer(BaseMaterializer):
    """Materializer to read/write service instances."""

    ASSOCIATED_TYPES = (BaseService,)
    ASSOCIATED_ARTIFACT_TYPES = (ServiceArtifact,)

    def handle_input(self, data_type: Type[Any]) -> BaseService:
        """Creates and returns a service.

        This service is instantiated from the serialized service configuration
        and last known status information saved as artifact.

        Args:
            data_type: The type of the data to read.

        Returns:
            A ZenML service instance.
        """
        super().handle_input(data_type)
        filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
        with fileio.open(filepath, "r") as f:
            service = ServiceRegistry().load_service_from_json(f.read())
        return service

    def handle_return(self, service: BaseService) -> None:
        """Writes a ZenML service.

        The configuration and last known status of the input service instance
        are serialized and saved as an artifact.

        Args:
            service: A ZenML service instance.
        """
        super().handle_return(service)
        filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
        with fileio.open(filepath, "w") as f:
            f.write(service.json(indent=4))
handle_input(self, data_type)

Creates and returns a service.

This service is instantiated from the serialized service configuration and last known status information saved as artifact.

Parameters:

Name Type Description Default
data_type Type[Any]

The type of the data to read.

required

Returns:

Type Description
BaseService

A ZenML service instance.

Source code in zenml/materializers/service_materializer.py
def handle_input(self, data_type: Type[Any]) -> BaseService:
    """Creates and returns a service.

    This service is instantiated from the serialized service configuration
    and last known status information saved as artifact.

    Args:
        data_type: The type of the data to read.

    Returns:
        A ZenML service instance.
    """
    super().handle_input(data_type)
    filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
    with fileio.open(filepath, "r") as f:
        service = ServiceRegistry().load_service_from_json(f.read())
    return service
handle_return(self, service)

Writes a ZenML service.

The configuration and last known status of the input service instance are serialized and saved as an artifact.

Parameters:

Name Type Description Default
service BaseService

A ZenML service instance.

required
Source code in zenml/materializers/service_materializer.py
def handle_return(self, service: BaseService) -> None:
    """Writes a ZenML service.

    The configuration and last known status of the input service instance
    are serialized and saved as an artifact.

    Args:
        service: A ZenML service instance.
    """
    super().handle_return(service)
    filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
    with fileio.open(filepath, "w") as f:
        f.write(service.json(indent=4))