Materializers
zenml.materializers
special
Materializers are used to convert a ZenML artifact into a specific format. They
are most often used to handle the input or output of ZenML steps, and can be
extended by building on the BaseMaterializer
class.
base_materializer
BaseMaterializer
Base Materializer to realize artifact data.
Source code in zenml/materializers/base_materializer.py
class BaseMaterializer(metaclass=BaseMaterializerMeta):
"""Base Materializer to realize artifact data."""
ASSOCIATED_ARTIFACT_TYPES: ClassVar[Tuple[Type["BaseArtifact"], ...]] = ()
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = ()
def __init__(self, artifact: "BaseArtifact"):
"""Initializes a materializer with the given artifact."""
self.artifact = artifact
def _can_handle_type(self, data_type: Type[Any]) -> bool:
"""Whether the materializer can read/write a certain type."""
return any(
issubclass(data_type, associated_type)
for associated_type in self.ASSOCIATED_TYPES
)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Write logic here to handle input of the step function.
Args:
data_type: What type the input should be materialized as.
Returns:
Any object that is to be passed into the relevant artifact in the
step.
"""
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to handle type {data_type}. {self.__class__.__name__} "
f"can only read artifacts to the following types: "
f"{self.ASSOCIATED_TYPES}."
)
def handle_return(self, data: Any) -> None:
"""Write logic here to handle return of the step function.
Args:
data: Any object that is specified as an input artifact of the step.
"""
data_type = type(data)
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to write {data_type}. {self.__class__.__name__} "
f"can only write the following types: {self.ASSOCIATED_TYPES}."
)
__init__(self, artifact)
special
Initializes a materializer with the given artifact.
Source code in zenml/materializers/base_materializer.py
def __init__(self, artifact: "BaseArtifact"):
"""Initializes a materializer with the given artifact."""
self.artifact = artifact
handle_input(self, data_type)
Write logic here to handle input of the step function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
What type the input should be materialized as. |
required |
Returns:
Type | Description |
---|---|
Any |
Any object that is to be passed into the relevant artifact in the step. |
Source code in zenml/materializers/base_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Write logic here to handle input of the step function.
Args:
data_type: What type the input should be materialized as.
Returns:
Any object that is to be passed into the relevant artifact in the
step.
"""
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to handle type {data_type}. {self.__class__.__name__} "
f"can only read artifacts to the following types: "
f"{self.ASSOCIATED_TYPES}."
)
handle_return(self, data)
Write logic here to handle return of the step function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
Any object that is specified as an input artifact of the step. |
required |
Source code in zenml/materializers/base_materializer.py
def handle_return(self, data: Any) -> None:
"""Write logic here to handle return of the step function.
Args:
data: Any object that is specified as an input artifact of the step.
"""
data_type = type(data)
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to write {data_type}. {self.__class__.__name__} "
f"can only write the following types: {self.ASSOCIATED_TYPES}."
)
BaseMaterializerMeta (type)
Metaclass responsible for registering different BaseMaterializer subclasses for reading/writing artifacts.
Source code in zenml/materializers/base_materializer.py
class BaseMaterializerMeta(type):
"""Metaclass responsible for registering different BaseMaterializer
subclasses for reading/writing artifacts."""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
"""Creates a Materializer class and registers it at
the `MaterializerRegistry`."""
cls = cast(
Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
)
if name != "BaseMaterializer":
from zenml.artifacts.base_artifact import BaseArtifact
if not cls.ASSOCIATED_TYPES:
raise MaterializerInterfaceError(
f"Invalid materializer class '{name}'. When creating a "
f"custom materializer, make sure to specify at least one "
f"type in its ASSOCIATED_TYPES class variable.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
if not (
inspect.isclass(artifact_type)
and issubclass(artifact_type, BaseArtifact)
):
raise MaterializerInterfaceError(
f"Associated artifact type {artifact_type} for "
f"materializer {name} is not a `BaseArtifact` "
f"subclass.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
for associated_type in cls.ASSOCIATED_TYPES:
if not inspect.isclass(associated_type):
raise MaterializerInterfaceError(
f"Associated type {associated_type} for materializer "
f"{name} is not a class.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
default_materializer_registry.register_materializer_type(
associated_type, cls
)
type_registry.register_integration(
associated_type, artifact_types
)
return cls
__new__(mcs, name, bases, dct)
special
staticmethod
Creates a Materializer class and registers it at
the MaterializerRegistry
.
Source code in zenml/materializers/base_materializer.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
"""Creates a Materializer class and registers it at
the `MaterializerRegistry`."""
cls = cast(
Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
)
if name != "BaseMaterializer":
from zenml.artifacts.base_artifact import BaseArtifact
if not cls.ASSOCIATED_TYPES:
raise MaterializerInterfaceError(
f"Invalid materializer class '{name}'. When creating a "
f"custom materializer, make sure to specify at least one "
f"type in its ASSOCIATED_TYPES class variable.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
if not (
inspect.isclass(artifact_type)
and issubclass(artifact_type, BaseArtifact)
):
raise MaterializerInterfaceError(
f"Associated artifact type {artifact_type} for "
f"materializer {name} is not a `BaseArtifact` "
f"subclass.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
for associated_type in cls.ASSOCIATED_TYPES:
if not inspect.isclass(associated_type):
raise MaterializerInterfaceError(
f"Associated type {associated_type} for materializer "
f"{name} is not a class.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
default_materializer_registry.register_materializer_type(
associated_type, cls
)
type_registry.register_integration(
associated_type, artifact_types
)
return cls
beam_materializer
BeamMaterializer (BaseMaterializer)
Materializer to read data to and from beam.
Source code in zenml/materializers/beam_materializer.py
class BeamMaterializer(BaseMaterializer):
"""Materializer to read data to and from beam."""
ASSOCIATED_TYPES = (beam.PCollection,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads all files inside the artifact directory and materializes them
as a beam compatible output."""
# TODO [ENG-138]: Implement beam reading
super().handle_input(data_type)
def handle_return(self, pipeline: beam.Pipeline) -> None:
"""Appends a beam.io.WriteToParquet at the end of a beam pipeline
and therefore persists the results.
Args:
pipeline: A beam.pipeline object.
"""
# TODO [ENG-139]: Implement beam writing
super().handle_return(pipeline)
pipeline | beam.ParDo()
pipeline.run()
# pipeline | beam.io.WriteToParquet(self.artifact.uri)
# pipeline.run()
handle_input(self, data_type)
Reads all files inside the artifact directory and materializes them as a beam compatible output.
Source code in zenml/materializers/beam_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads all files inside the artifact directory and materializes them
as a beam compatible output."""
# TODO [ENG-138]: Implement beam reading
super().handle_input(data_type)
handle_return(self, pipeline)
Appends a beam.io.WriteToParquet at the end of a beam pipeline and therefore persists the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
Pipeline |
A beam.pipeline object. |
required |
Source code in zenml/materializers/beam_materializer.py
def handle_return(self, pipeline: beam.Pipeline) -> None:
"""Appends a beam.io.WriteToParquet at the end of a beam pipeline
and therefore persists the results.
Args:
pipeline: A beam.pipeline object.
"""
# TODO [ENG-139]: Implement beam writing
super().handle_return(pipeline)
pipeline | beam.ParDo()
pipeline.run()
# pipeline | beam.io.WriteToParquet(self.artifact.uri)
# pipeline.run()
built_in_materializer
BuiltInMaterializer (BaseMaterializer)
Read/Write JSON files.
Source code in zenml/materializers/built_in_materializer.py
class BuiltInMaterializer(BaseMaterializer):
"""Read/Write JSON files."""
# TODO [ENG-322]: consider adding typing.Dict and typing.List
# since these are the 'correct' way to annotate these types.
ASSOCIATED_ARTIFACT_TYPES = (
DataArtifact,
DataAnalysisArtifact,
)
ASSOCIATED_TYPES = (
int,
str,
bytes,
dict,
float,
list,
tuple,
bool,
)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads basic primitive types from json."""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
contents = yaml_utils.read_json(filepath)
if type(contents) != data_type:
# TODO [ENG-142]: Raise error or try to coerce
logger.debug(
f"Contents {contents} was type {type(contents)} but expected "
f"{data_type}"
)
return contents
def handle_return(self, data: Any) -> None:
"""Handles basic built-in types and stores them as json"""
super().handle_return(data)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
yaml_utils.write_json(filepath, data)
handle_input(self, data_type)
Reads basic primitive types from json.
Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads basic primitive types from json."""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
contents = yaml_utils.read_json(filepath)
if type(contents) != data_type:
# TODO [ENG-142]: Raise error or try to coerce
logger.debug(
f"Contents {contents} was type {type(contents)} but expected "
f"{data_type}"
)
return contents
handle_return(self, data)
Handles basic built-in types and stores them as json
Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
"""Handles basic built-in types and stores them as json"""
super().handle_return(data)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
yaml_utils.write_json(filepath, data)
default_materializer_registry
MaterializerRegistry
Matches a python type to a default materializer.
Source code in zenml/materializers/default_materializer_registry.py
class MaterializerRegistry:
"""Matches a python type to a default materializer."""
def __init__(self) -> None:
self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}
def register_materializer_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
if key not in self.materializer_types:
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
else:
logger.debug(
f"Found existing materializer class for {key}: "
f"{self.materializer_types[key]}. Skipping registration of "
f"{type_}."
)
def register_and_overwrite_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer and also overwrites a default if set.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
"""Get a single materializers based on the key.
Args:
key: Indicates the type of object.
Returns:
`BaseMaterializer` subclass that was registered for this key.
Raises:
StepInterfaceError: If the key (or any of its superclasses) is not
registered or the key has more than one superclass with
different default materializers
"""
# Check whether the type is registered
if key in self.materializer_types:
return self.materializer_types[key]
else:
# If the type is not registered, check for superclasses
materializers_for_compatible_superclasses = {
materializer
for registered_type, materializer in self.materializer_types.items()
if issubclass(key, registered_type)
}
# Make sure that there is only a single materializer
if len(materializers_for_compatible_superclasses) == 1:
return materializers_for_compatible_superclasses.pop()
elif len(materializers_for_compatible_superclasses) > 1:
raise StepInterfaceError(
f"Type {key} is subclassing more than one type, thus it "
f"maps to multiple materializers within the materializer "
f"registry: {materializers_for_compatible_superclasses}. "
f"Please specify which of these materializers you would "
f"like to use explicitly in your step.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
raise StepInterfaceError(
f"No materializer registered for class {key}. You can register a "
f"default materializer for specific types by subclassing "
f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
f" variable.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
def get_materializer_types(
self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
"""Get all registered materializer types."""
return self.materializer_types
def is_registered(self, key: Type[Any]) -> bool:
"""Returns if a materializer class is registered for the given type."""
return any(issubclass(key, t) for t in self.materializer_types)
__getitem__(self, key)
special
Get a single materializers based on the key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
Returns:
Type | Description |
---|---|
Type[BaseMaterializer] |
|
Exceptions:
Type | Description |
---|---|
StepInterfaceError |
If the key (or any of its superclasses) is not registered or the key has more than one superclass with different default materializers |
Source code in zenml/materializers/default_materializer_registry.py
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
"""Get a single materializers based on the key.
Args:
key: Indicates the type of object.
Returns:
`BaseMaterializer` subclass that was registered for this key.
Raises:
StepInterfaceError: If the key (or any of its superclasses) is not
registered or the key has more than one superclass with
different default materializers
"""
# Check whether the type is registered
if key in self.materializer_types:
return self.materializer_types[key]
else:
# If the type is not registered, check for superclasses
materializers_for_compatible_superclasses = {
materializer
for registered_type, materializer in self.materializer_types.items()
if issubclass(key, registered_type)
}
# Make sure that there is only a single materializer
if len(materializers_for_compatible_superclasses) == 1:
return materializers_for_compatible_superclasses.pop()
elif len(materializers_for_compatible_superclasses) > 1:
raise StepInterfaceError(
f"Type {key} is subclassing more than one type, thus it "
f"maps to multiple materializers within the materializer "
f"registry: {materializers_for_compatible_superclasses}. "
f"Please specify which of these materializers you would "
f"like to use explicitly in your step.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
raise StepInterfaceError(
f"No materializer registered for class {key}. You can register a "
f"default materializer for specific types by subclassing "
f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
f" variable.",
url="https://docs.zenml.io/guides/index/custom-materializer",
)
get_materializer_types(self)
Get all registered materializer types.
Source code in zenml/materializers/default_materializer_registry.py
def get_materializer_types(
self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
"""Get all registered materializer types."""
return self.materializer_types
is_registered(self, key)
Returns if a materializer class is registered for the given type.
Source code in zenml/materializers/default_materializer_registry.py
def is_registered(self, key: Type[Any]) -> bool:
"""Returns if a materializer class is registered for the given type."""
return any(issubclass(key, t) for t in self.materializer_types)
register_and_overwrite_type(self, key, type_)
Registers a new materializer and also overwrites a default if set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
type_ |
Type[BaseMaterializer] |
A BaseMaterializer subclass. |
required |
Source code in zenml/materializers/default_materializer_registry.py
def register_and_overwrite_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer and also overwrites a default if set.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
register_materializer_type(self, key, type_)
Registers a new materializer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
type_ |
Type[BaseMaterializer] |
A BaseMaterializer subclass. |
required |
Source code in zenml/materializers/default_materializer_registry.py
def register_materializer_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
if key not in self.materializer_types:
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
else:
logger.debug(
f"Found existing materializer class for {key}: "
f"{self.materializer_types[key]}. Skipping registration of "
f"{type_}."
)
numpy_materializer
NumpyMaterializer (BaseMaterializer)
Materializer to read data to and from pandas.
Source code in zenml/materializers/numpy_materializer.py
class NumpyMaterializer(BaseMaterializer):
"""Materializer to read data to and from pandas."""
ASSOCIATED_TYPES = (np.ndarray,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[Any]) -> np.ndarray:
"""Reads numpy array from parquet file."""
super().handle_input(data_type)
shape_dict = yaml_utils.read_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME)
)
shape_tuple = tuple(shape_dict.values())
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
) as f:
input_stream = pa.input_stream(f)
data = pq.read_table(input_stream)
vals = getattr(data.to_pandas(), DATA_VAR).values
return np.reshape(vals, shape_tuple)
def handle_return(self, arr: np.ndarray) -> None:
"""Writes a np.ndarray to the artifact store as a parquet file.
Args:
arr: The numpy array to write.
"""
super().handle_return(arr)
yaml_utils.write_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME),
{str(i): x for i, x in enumerate(arr.shape)},
)
pa_table = pa.table({DATA_VAR: arr.flatten()})
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
) as f:
stream = pa.output_stream(f)
pq.write_table(pa_table, stream)
handle_input(self, data_type)
Reads numpy array from parquet file.
Source code in zenml/materializers/numpy_materializer.py
def handle_input(self, data_type: Type[Any]) -> np.ndarray:
"""Reads numpy array from parquet file."""
super().handle_input(data_type)
shape_dict = yaml_utils.read_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME)
)
shape_tuple = tuple(shape_dict.values())
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
) as f:
input_stream = pa.input_stream(f)
data = pq.read_table(input_stream)
vals = getattr(data.to_pandas(), DATA_VAR).values
return np.reshape(vals, shape_tuple)
handle_return(self, arr)
Writes a np.ndarray to the artifact store as a parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
ndarray |
The numpy array to write. |
required |
Source code in zenml/materializers/numpy_materializer.py
def handle_return(self, arr: np.ndarray) -> None:
"""Writes a np.ndarray to the artifact store as a parquet file.
Args:
arr: The numpy array to write.
"""
super().handle_return(arr)
yaml_utils.write_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME),
{str(i): x for i, x in enumerate(arr.shape)},
)
pa_table = pa.table({DATA_VAR: arr.flatten()})
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
) as f:
stream = pa.output_stream(f)
pq.write_table(pa_table, stream)
pandas_materializer
PandasMaterializer (BaseMaterializer)
Materializer to read data to and from pandas.
Source code in zenml/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
"""Materializer to read data to and from pandas."""
ASSOCIATED_TYPES = (pd.DataFrame,)
ASSOCIATED_ARTIFACT_TYPES = (
DataArtifact,
StatisticsArtifact,
SchemaArtifact,
)
def handle_input(self, data_type: Type[Any]) -> pd.DataFrame:
"""Reads pd.Dataframe from a parquet file."""
super().handle_input(data_type)
return pd.read_parquet(
os.path.join(self.artifact.uri, DEFAULT_FILENAME)
)
def handle_return(self, df: pd.DataFrame) -> None:
"""Writes a pandas dataframe to the specified filename.
Args:
df: The pandas dataframe to write.
"""
super().handle_return(df)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
df.to_parquet(filepath, compression=COMPRESSION_TYPE)
handle_input(self, data_type)
Reads pd.Dataframe from a parquet file.
Source code in zenml/materializers/pandas_materializer.py
def handle_input(self, data_type: Type[Any]) -> pd.DataFrame:
"""Reads pd.Dataframe from a parquet file."""
super().handle_input(data_type)
return pd.read_parquet(
os.path.join(self.artifact.uri, DEFAULT_FILENAME)
)
handle_return(self, df)
Writes a pandas dataframe to the specified filename.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
DataFrame |
The pandas dataframe to write. |
required |
Source code in zenml/materializers/pandas_materializer.py
def handle_return(self, df: pd.DataFrame) -> None:
"""Writes a pandas dataframe to the specified filename.
Args:
df: The pandas dataframe to write.
"""
super().handle_return(df)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
df.to_parquet(filepath, compression=COMPRESSION_TYPE)