Materializers
zenml.materializers
special
Initialization of ZenML materializers.
Materializers are used to convert a ZenML artifact into a specific format. They
are most often used to handle the input or output of ZenML steps, and can be
extended by building on the BaseMaterializer
class.
base_materializer
Metaclass implementation for registering ZenML BaseMaterializer subclasses.
BaseMaterializer
Base Materializer to realize artifact data.
Source code in zenml/materializers/base_materializer.py
class BaseMaterializer(metaclass=BaseMaterializerMeta):
"""Base Materializer to realize artifact data."""
ASSOCIATED_ARTIFACT_TYPES: ClassVar[Tuple[Type["BaseArtifact"], ...]] = ()
ASSOCIATED_TYPES: ClassVar[Tuple[Type[Any], ...]] = ()
def __init__(self, artifact: "BaseArtifact"):
"""Initializes a materializer with the given artifact.
Args:
artifact: The artifact to materialize.
"""
self.artifact = artifact
def _can_handle_type(self, data_type: Type[Any]) -> bool:
"""Whether the materializer can read/write a certain type.
Args:
data_type: The type to check.
Returns:
Whether the materializer can read/write the given type.
"""
return any(
issubclass(data_type, associated_type)
for associated_type in self.ASSOCIATED_TYPES
)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Write logic here to handle input of the step function.
Args:
data_type: What type the input should be materialized as.
Raises:
TypeError: If the data is not of the correct type.
"""
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to handle type {data_type}. {self.__class__.__name__} "
f"can only read artifacts to the following types: "
f"{self.ASSOCIATED_TYPES}."
)
def handle_return(self, data: Any) -> None:
"""Write logic here to handle return of the step function.
Args:
data: Any object that is specified as an input artifact of the step.
Raises:
TypeError: If the data is not of the correct type.
"""
data_type = type(data)
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to write {data_type}. {self.__class__.__name__} "
f"can only write the following types: {self.ASSOCIATED_TYPES}."
)
__init__(self, artifact)
special
Initializes a materializer with the given artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact |
BaseArtifact |
The artifact to materialize. |
required |
Source code in zenml/materializers/base_materializer.py
def __init__(self, artifact: "BaseArtifact"):
"""Initializes a materializer with the given artifact.
Args:
artifact: The artifact to materialize.
"""
self.artifact = artifact
handle_input(self, data_type)
Write logic here to handle input of the step function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
What type the input should be materialized as. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the data is not of the correct type. |
Source code in zenml/materializers/base_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Write logic here to handle input of the step function.
Args:
data_type: What type the input should be materialized as.
Raises:
TypeError: If the data is not of the correct type.
"""
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to handle type {data_type}. {self.__class__.__name__} "
f"can only read artifacts to the following types: "
f"{self.ASSOCIATED_TYPES}."
)
handle_return(self, data)
Write logic here to handle return of the step function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
Any object that is specified as an input artifact of the step. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the data is not of the correct type. |
Source code in zenml/materializers/base_materializer.py
def handle_return(self, data: Any) -> None:
"""Write logic here to handle return of the step function.
Args:
data: Any object that is specified as an input artifact of the step.
Raises:
TypeError: If the data is not of the correct type.
"""
data_type = type(data)
if not self._can_handle_type(data_type):
raise TypeError(
f"Unable to write {data_type}. {self.__class__.__name__} "
f"can only write the following types: {self.ASSOCIATED_TYPES}."
)
BaseMaterializerMeta (type)
Metaclass responsible for registering different BaseMaterializer subclasses.
Materializers are used for reading/writing artifacts.
Source code in zenml/materializers/base_materializer.py
class BaseMaterializerMeta(type):
"""Metaclass responsible for registering different BaseMaterializer subclasses.
Materializers are used for reading/writing artifacts.
"""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
"""Creates a Materializer class and registers it at the `MaterializerRegistry`.
Args:
name: The name of the class.
bases: The base classes of the class.
dct: The dictionary of the class.
Returns:
The BaseMaterializerMeta class.
Raises:
MaterializerInterfaceError: If the class was improperly defined.
"""
cls = cast(
Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
)
if name != "BaseMaterializer":
from zenml.artifacts.base_artifact import BaseArtifact
if not cls.ASSOCIATED_TYPES:
raise MaterializerInterfaceError(
f"Invalid materializer class '{name}'. When creating a "
f"custom materializer, make sure to specify at least one "
f"type in its ASSOCIATED_TYPES class variable.",
url="https://docs.zenml.io/developer-guide/materializer",
)
for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
if not (
inspect.isclass(artifact_type)
and issubclass(artifact_type, BaseArtifact)
):
raise MaterializerInterfaceError(
f"Associated artifact type {artifact_type} for "
f"materializer {name} is not a `BaseArtifact` "
f"subclass.",
url="https://docs.zenml.io/developer-guide/materializer",
)
artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
for associated_type in cls.ASSOCIATED_TYPES:
if not inspect.isclass(associated_type):
raise MaterializerInterfaceError(
f"Associated type {associated_type} for materializer "
f"{name} is not a class.",
url="https://docs.zenml.io/developer-guide/materializer",
)
default_materializer_registry.register_materializer_type(
associated_type, cls
)
type_registry.register_integration(
associated_type, artifact_types
)
return cls
__new__(mcs, name, bases, dct)
special
staticmethod
Creates a Materializer class and registers it at the MaterializerRegistry
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the class. |
required |
bases |
Tuple[Type[Any], ...] |
The base classes of the class. |
required |
dct |
Dict[str, Any] |
The dictionary of the class. |
required |
Returns:
Type | Description |
---|---|
BaseMaterializerMeta |
The BaseMaterializerMeta class. |
Exceptions:
Type | Description |
---|---|
MaterializerInterfaceError |
If the class was improperly defined. |
Source code in zenml/materializers/base_materializer.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseMaterializerMeta":
"""Creates a Materializer class and registers it at the `MaterializerRegistry`.
Args:
name: The name of the class.
bases: The base classes of the class.
dct: The dictionary of the class.
Returns:
The BaseMaterializerMeta class.
Raises:
MaterializerInterfaceError: If the class was improperly defined.
"""
cls = cast(
Type["BaseMaterializer"], super().__new__(mcs, name, bases, dct)
)
if name != "BaseMaterializer":
from zenml.artifacts.base_artifact import BaseArtifact
if not cls.ASSOCIATED_TYPES:
raise MaterializerInterfaceError(
f"Invalid materializer class '{name}'. When creating a "
f"custom materializer, make sure to specify at least one "
f"type in its ASSOCIATED_TYPES class variable.",
url="https://docs.zenml.io/developer-guide/materializer",
)
for artifact_type in cls.ASSOCIATED_ARTIFACT_TYPES:
if not (
inspect.isclass(artifact_type)
and issubclass(artifact_type, BaseArtifact)
):
raise MaterializerInterfaceError(
f"Associated artifact type {artifact_type} for "
f"materializer {name} is not a `BaseArtifact` "
f"subclass.",
url="https://docs.zenml.io/developer-guide/materializer",
)
artifact_types = cls.ASSOCIATED_ARTIFACT_TYPES or (BaseArtifact,)
for associated_type in cls.ASSOCIATED_TYPES:
if not inspect.isclass(associated_type):
raise MaterializerInterfaceError(
f"Associated type {associated_type} for materializer "
f"{name} is not a class.",
url="https://docs.zenml.io/developer-guide/materializer",
)
default_materializer_registry.register_materializer_type(
associated_type, cls
)
type_registry.register_integration(
associated_type, artifact_types
)
return cls
beam_materializer
Implementation of the ZenML Apache Beam materializer.
BeamMaterializer (BaseMaterializer)
Materializer to read data to and from beam.
Source code in zenml/materializers/beam_materializer.py
class BeamMaterializer(BaseMaterializer):
"""Materializer to read data to and from beam."""
ASSOCIATED_TYPES = (beam.PCollection,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads all files inside the artifact directory.
It materializes them as a beam compatible output.
Args:
data_type: The type of the data to read.
"""
# TODO [ENG-138]: Implement beam reading
super().handle_input(data_type)
def handle_return(self, pipeline: beam.Pipeline) -> None:
"""Appends a beam.io.WriteToParquet at the end of a beam pipeline.
This therefore persists the results.
Args:
pipeline: A beam.pipeline object.
"""
# TODO [ENG-139]: Implement beam writing
super().handle_return(pipeline)
pipeline | beam.ParDo()
pipeline.run()
# pipeline | beam.io.WriteToParquet(self.artifact.uri)
# pipeline.run()
handle_input(self, data_type)
Reads all files inside the artifact directory.
It materializes them as a beam compatible output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Source code in zenml/materializers/beam_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads all files inside the artifact directory.
It materializes them as a beam compatible output.
Args:
data_type: The type of the data to read.
"""
# TODO [ENG-138]: Implement beam reading
super().handle_input(data_type)
handle_return(self, pipeline)
Appends a beam.io.WriteToParquet at the end of a beam pipeline.
This therefore persists the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
Pipeline |
A beam.pipeline object. |
required |
Source code in zenml/materializers/beam_materializer.py
def handle_return(self, pipeline: beam.Pipeline) -> None:
"""Appends a beam.io.WriteToParquet at the end of a beam pipeline.
This therefore persists the results.
Args:
pipeline: A beam.pipeline object.
"""
# TODO [ENG-139]: Implement beam writing
super().handle_return(pipeline)
pipeline | beam.ParDo()
pipeline.run()
# pipeline | beam.io.WriteToParquet(self.artifact.uri)
# pipeline.run()
built_in_materializer
Implementation of ZenML's builtin materializer.
BuiltInMaterializer (BaseMaterializer)
Read/Write JSON files.
Source code in zenml/materializers/built_in_materializer.py
class BuiltInMaterializer(BaseMaterializer):
"""Read/Write JSON files."""
# since these are the 'correct' way to annotate these types.
ASSOCIATED_ARTIFACT_TYPES = (
DataArtifact,
DataAnalysisArtifact,
)
ASSOCIATED_TYPES = (
int,
str,
bytes,
dict,
float,
list,
tuple,
bool,
)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads basic primitive types from json.
Args:
data_type: The type of the data to read.
Returns:
The data read.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
contents = yaml_utils.read_json(filepath)
if type(contents) != data_type:
# TODO [ENG-142]: Raise error or try to coerce
logger.debug(
f"Contents {contents} was type {type(contents)} but expected "
f"{data_type}"
)
return contents
def handle_return(self, data: Any) -> None:
"""Handles basic built-in types and stores them as json.
Args:
data: The data to store.
"""
super().handle_return(data)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
yaml_utils.write_json(filepath, data)
handle_input(self, data_type)
Reads basic primitive types from json.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
Any |
The data read. |
Source code in zenml/materializers/built_in_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads basic primitive types from json.
Args:
data_type: The type of the data to read.
Returns:
The data read.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
contents = yaml_utils.read_json(filepath)
if type(contents) != data_type:
# TODO [ENG-142]: Raise error or try to coerce
logger.debug(
f"Contents {contents} was type {type(contents)} but expected "
f"{data_type}"
)
return contents
handle_return(self, data)
Handles basic built-in types and stores them as json.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
The data to store. |
required |
Source code in zenml/materializers/built_in_materializer.py
def handle_return(self, data: Any) -> None:
"""Handles basic built-in types and stores them as json.
Args:
data: The data to store.
"""
super().handle_return(data)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
yaml_utils.write_json(filepath, data)
default_materializer_registry
Implementation of a default materializer registry.
MaterializerRegistry
Matches a Python type to a default materializer.
Source code in zenml/materializers/default_materializer_registry.py
class MaterializerRegistry:
"""Matches a Python type to a default materializer."""
def __init__(self) -> None:
"""Initialize the materializer registry."""
self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}
def register_materializer_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
if key not in self.materializer_types:
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
else:
logger.debug(
f"Found existing materializer class for {key}: "
f"{self.materializer_types[key]}. Skipping registration of "
f"{type_}."
)
def register_and_overwrite_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer and also overwrites a default if set.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
"""Get a single materializers based on the key.
Args:
key: Indicates the type of object.
Returns:
`BaseMaterializer` subclass that was registered for this key.
Raises:
StepInterfaceError: If the key (or any of its superclasses) is not
registered or the key has more than one superclass with
different default materializers
"""
# Check whether the type is registered
if key in self.materializer_types:
return self.materializer_types[key]
else:
# If the type is not registered, check for superclasses
materializers_for_compatible_superclasses = {
materializer
for registered_type, materializer in self.materializer_types.items()
if issubclass(key, registered_type)
}
# Make sure that there is only a single materializer
if len(materializers_for_compatible_superclasses) == 1:
return materializers_for_compatible_superclasses.pop()
elif len(materializers_for_compatible_superclasses) > 1:
raise StepInterfaceError(
f"Type {key} is subclassing more than one type, thus it "
f"maps to multiple materializers within the materializer "
f"registry: {materializers_for_compatible_superclasses}. "
f"Please specify which of these materializers you would "
f"like to use explicitly in your step.",
url="https://docs.zenml.io/developer-guide/materializer",
)
raise StepInterfaceError(
f"No materializer registered for class {key}. You can register a "
f"default materializer for specific types by subclassing "
f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
f" variable.",
url="https://docs.zenml.io/developer-guide/materializer",
)
def get_materializer_types(
self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
"""Get all registered materializer types.
Returns:
A dictionary of registered materializer types.
"""
return self.materializer_types
def is_registered(self, key: Type[Any]) -> bool:
"""Returns if a materializer class is registered for the given type.
Args:
key: Indicates the type of object.
Returns:
True if a materializer is registered for the given type, False
otherwise.
"""
return any(issubclass(key, t) for t in self.materializer_types)
__getitem__(self, key)
special
Get a single materializers based on the key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
Returns:
Type | Description |
---|---|
Type[BaseMaterializer] |
|
Exceptions:
Type | Description |
---|---|
StepInterfaceError |
If the key (or any of its superclasses) is not registered or the key has more than one superclass with different default materializers |
Source code in zenml/materializers/default_materializer_registry.py
def __getitem__(self, key: Type[Any]) -> Type["BaseMaterializer"]:
"""Get a single materializers based on the key.
Args:
key: Indicates the type of object.
Returns:
`BaseMaterializer` subclass that was registered for this key.
Raises:
StepInterfaceError: If the key (or any of its superclasses) is not
registered or the key has more than one superclass with
different default materializers
"""
# Check whether the type is registered
if key in self.materializer_types:
return self.materializer_types[key]
else:
# If the type is not registered, check for superclasses
materializers_for_compatible_superclasses = {
materializer
for registered_type, materializer in self.materializer_types.items()
if issubclass(key, registered_type)
}
# Make sure that there is only a single materializer
if len(materializers_for_compatible_superclasses) == 1:
return materializers_for_compatible_superclasses.pop()
elif len(materializers_for_compatible_superclasses) > 1:
raise StepInterfaceError(
f"Type {key} is subclassing more than one type, thus it "
f"maps to multiple materializers within the materializer "
f"registry: {materializers_for_compatible_superclasses}. "
f"Please specify which of these materializers you would "
f"like to use explicitly in your step.",
url="https://docs.zenml.io/developer-guide/materializer",
)
raise StepInterfaceError(
f"No materializer registered for class {key}. You can register a "
f"default materializer for specific types by subclassing "
f"`BaseMaterializer` and setting its `ASSOCIATED_TYPES` class"
f" variable.",
url="https://docs.zenml.io/developer-guide/materializer",
)
__init__(self)
special
Initialize the materializer registry.
Source code in zenml/materializers/default_materializer_registry.py
def __init__(self) -> None:
"""Initialize the materializer registry."""
self.materializer_types: Dict[Type[Any], Type["BaseMaterializer"]] = {}
get_materializer_types(self)
Get all registered materializer types.
Returns:
Type | Description |
---|---|
Dict[Type[Any], Type[BaseMaterializer]] |
A dictionary of registered materializer types. |
Source code in zenml/materializers/default_materializer_registry.py
def get_materializer_types(
self,
) -> Dict[Type[Any], Type["BaseMaterializer"]]:
"""Get all registered materializer types.
Returns:
A dictionary of registered materializer types.
"""
return self.materializer_types
is_registered(self, key)
Returns if a materializer class is registered for the given type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
Returns:
Type | Description |
---|---|
bool |
True if a materializer is registered for the given type, False otherwise. |
Source code in zenml/materializers/default_materializer_registry.py
def is_registered(self, key: Type[Any]) -> bool:
"""Returns if a materializer class is registered for the given type.
Args:
key: Indicates the type of object.
Returns:
True if a materializer is registered for the given type, False
otherwise.
"""
return any(issubclass(key, t) for t in self.materializer_types)
register_and_overwrite_type(self, key, type_)
Registers a new materializer and also overwrites a default if set.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
type_ |
Type[BaseMaterializer] |
A BaseMaterializer subclass. |
required |
Source code in zenml/materializers/default_materializer_registry.py
def register_and_overwrite_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer and also overwrites a default if set.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
register_materializer_type(self, key, type_)
Registers a new materializer.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
key |
Type[Any] |
Indicates the type of object. |
required |
type_ |
Type[BaseMaterializer] |
A BaseMaterializer subclass. |
required |
Source code in zenml/materializers/default_materializer_registry.py
def register_materializer_type(
self, key: Type[Any], type_: Type["BaseMaterializer"]
) -> None:
"""Registers a new materializer.
Args:
key: Indicates the type of object.
type_: A BaseMaterializer subclass.
"""
if key not in self.materializer_types:
self.materializer_types[key] = type_
logger.debug(f"Registered materializer {type_} for {key}")
else:
logger.debug(
f"Found existing materializer class for {key}: "
f"{self.materializer_types[key]}. Skipping registration of "
f"{type_}."
)
numpy_materializer
Implementation of the ZenML NumPy materializer.
NumpyMaterializer (BaseMaterializer)
Materializer to read data to and from pandas.
Source code in zenml/materializers/numpy_materializer.py
class NumpyMaterializer(BaseMaterializer):
"""Materializer to read data to and from pandas."""
ASSOCIATED_TYPES = (np.ndarray,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[Any]) -> "NDArray[Any]":
"""Reads numpy array from parquet file.
Args:
data_type: The type of the data to read.
Returns:
The numpy array.
"""
super().handle_input(data_type)
shape_dict = yaml_utils.read_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME)
)
shape_tuple = tuple(shape_dict.values())
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
) as f:
input_stream = pa.input_stream(f)
data = pq.read_table(input_stream)
vals = getattr(data.to_pandas(), DATA_VAR).values
return np.reshape(vals, shape_tuple)
def handle_return(self, arr: "NDArray[Any]") -> None:
"""Writes a np.ndarray to the artifact store as a parquet file.
Args:
arr: The numpy array to write.
"""
super().handle_return(arr)
yaml_utils.write_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME),
{str(i): x for i, x in enumerate(arr.shape)},
)
pa_table = pa.table({DATA_VAR: arr.flatten()})
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
) as f:
stream = pa.output_stream(f)
pq.write_table(pa_table, stream)
handle_input(self, data_type)
Reads numpy array from parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
NDArray[Any] |
The numpy array. |
Source code in zenml/materializers/numpy_materializer.py
def handle_input(self, data_type: Type[Any]) -> "NDArray[Any]":
"""Reads numpy array from parquet file.
Args:
data_type: The type of the data to read.
Returns:
The numpy array.
"""
super().handle_input(data_type)
shape_dict = yaml_utils.read_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME)
)
shape_tuple = tuple(shape_dict.values())
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "rb"
) as f:
input_stream = pa.input_stream(f)
data = pq.read_table(input_stream)
vals = getattr(data.to_pandas(), DATA_VAR).values
return np.reshape(vals, shape_tuple)
handle_return(self, arr)
Writes a np.ndarray to the artifact store as a parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
NDArray[Any] |
The numpy array to write. |
required |
Source code in zenml/materializers/numpy_materializer.py
def handle_return(self, arr: "NDArray[Any]") -> None:
"""Writes a np.ndarray to the artifact store as a parquet file.
Args:
arr: The numpy array to write.
"""
super().handle_return(arr)
yaml_utils.write_json(
os.path.join(self.artifact.uri, SHAPE_FILENAME),
{str(i): x for i, x in enumerate(arr.shape)},
)
pa_table = pa.table({DATA_VAR: arr.flatten()})
with fileio.open(
os.path.join(self.artifact.uri, DATA_FILENAME), "wb"
) as f:
stream = pa.output_stream(f)
pq.write_table(pa_table, stream)
pandas_materializer
Materializer for Pandas.
PandasMaterializer (BaseMaterializer)
Materializer to read data to and from pandas.
Source code in zenml/materializers/pandas_materializer.py
class PandasMaterializer(BaseMaterializer):
"""Materializer to read data to and from pandas."""
ASSOCIATED_TYPES = (pd.DataFrame, pd.Series)
ASSOCIATED_ARTIFACT_TYPES = (
DataArtifact,
StatisticsArtifact,
SchemaArtifact,
)
def handle_input(
self, data_type: Type[Any]
) -> Union[pd.DataFrame, pd.Series]:
"""Reads pd.DataFrame or pd.Series from a parquet file.
Args:
data_type: The type of the data to read.
Returns:
The pandas dataframe or series.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
# Create a temporary folder
temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")
temp_file = os.path.join(str(temp_dir), DEFAULT_FILENAME)
# Copy from artifact store to temporary file
fileio.copy(filepath, temp_file)
# Load the model from the temporary file
df = pd.read_parquet(temp_file)
# Cleanup and return
fileio.rmtree(temp_dir)
if issubclass(data_type, pd.Series):
# Taking the first column if its a series as the assumption
# is that there will only be one
assert len(df.columns) == 1
df = df[df.columns[0]]
return df
def handle_return(self, df: Union[pd.DataFrame, pd.Series]) -> None:
"""Writes a pandas dataframe or series to the specified filename.
Args:
df: The pandas dataframe or series to write.
"""
super().handle_return(df)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
if isinstance(df, pd.Series):
df = df.to_frame(name="series")
# Create a temporary file to store the model
with tempfile.NamedTemporaryFile(
mode="w", suffix=".gzip", delete=False
) as f:
df.to_parquet(f.name, compression=COMPRESSION_TYPE)
fileio.copy(f.name, filepath)
# Close and remove the temporary file
f.close()
fileio.remove(f.name)
handle_input(self, data_type)
Reads pd.DataFrame or pd.Series from a parquet file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
Union[pandas.core.frame.DataFrame, pandas.core.series.Series] |
The pandas dataframe or series. |
Source code in zenml/materializers/pandas_materializer.py
def handle_input(
self, data_type: Type[Any]
) -> Union[pd.DataFrame, pd.Series]:
"""Reads pd.DataFrame or pd.Series from a parquet file.
Args:
data_type: The type of the data to read.
Returns:
The pandas dataframe or series.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
# Create a temporary folder
temp_dir = tempfile.mkdtemp(prefix="zenml-temp-")
temp_file = os.path.join(str(temp_dir), DEFAULT_FILENAME)
# Copy from artifact store to temporary file
fileio.copy(filepath, temp_file)
# Load the model from the temporary file
df = pd.read_parquet(temp_file)
# Cleanup and return
fileio.rmtree(temp_dir)
if issubclass(data_type, pd.Series):
# Taking the first column if its a series as the assumption
# is that there will only be one
assert len(df.columns) == 1
df = df[df.columns[0]]
return df
handle_return(self, df)
Writes a pandas dataframe or series to the specified filename.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
df |
Union[pandas.core.frame.DataFrame, pandas.core.series.Series] |
The pandas dataframe or series to write. |
required |
Source code in zenml/materializers/pandas_materializer.py
def handle_return(self, df: Union[pd.DataFrame, pd.Series]) -> None:
"""Writes a pandas dataframe or series to the specified filename.
Args:
df: The pandas dataframe or series to write.
"""
super().handle_return(df)
filepath = os.path.join(self.artifact.uri, DEFAULT_FILENAME)
if isinstance(df, pd.Series):
df = df.to_frame(name="series")
# Create a temporary file to store the model
with tempfile.NamedTemporaryFile(
mode="w", suffix=".gzip", delete=False
) as f:
df.to_parquet(f.name, compression=COMPRESSION_TYPE)
fileio.copy(f.name, filepath)
# Close and remove the temporary file
f.close()
fileio.remove(f.name)
service_materializer
Implementation of a materializer to read and write ZenML service instances.
ServiceMaterializer (BaseMaterializer)
Materializer to read/write service instances.
Source code in zenml/materializers/service_materializer.py
class ServiceMaterializer(BaseMaterializer):
"""Materializer to read/write service instances."""
ASSOCIATED_TYPES = (BaseService,)
ASSOCIATED_ARTIFACT_TYPES = (ServiceArtifact,)
def handle_input(self, data_type: Type[Any]) -> BaseService:
"""Creates and returns a service.
This service is instantiated from the serialized service configuration
and last known status information saved as artifact.
Args:
data_type: The type of the data to read.
Returns:
A ZenML service instance.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
with fileio.open(filepath, "r") as f:
service = ServiceRegistry().load_service_from_json(f.read())
return service
def handle_return(self, service: BaseService) -> None:
"""Writes a ZenML service.
The configuration and last known status of the input service instance
are serialized and saved as an artifact.
Args:
service: A ZenML service instance.
"""
super().handle_return(service)
filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
with fileio.open(filepath, "w") as f:
f.write(service.json(indent=4))
handle_input(self, data_type)
Creates and returns a service.
This service is instantiated from the serialized service configuration and last known status information saved as artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
BaseService |
A ZenML service instance. |
Source code in zenml/materializers/service_materializer.py
def handle_input(self, data_type: Type[Any]) -> BaseService:
"""Creates and returns a service.
This service is instantiated from the serialized service configuration
and last known status information saved as artifact.
Args:
data_type: The type of the data to read.
Returns:
A ZenML service instance.
"""
super().handle_input(data_type)
filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
with fileio.open(filepath, "r") as f:
service = ServiceRegistry().load_service_from_json(f.read())
return service
handle_return(self, service)
Writes a ZenML service.
The configuration and last known status of the input service instance are serialized and saved as an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
A ZenML service instance. |
required |
Source code in zenml/materializers/service_materializer.py
def handle_return(self, service: BaseService) -> None:
"""Writes a ZenML service.
The configuration and last known status of the input service instance
are serialized and saved as an artifact.
Args:
service: A ZenML service instance.
"""
super().handle_return(service)
filepath = os.path.join(self.artifact.uri, SERVICE_CONFIG_FILENAME)
with fileio.open(filepath, "w") as f:
f.write(service.json(indent=4))