Tensorflow
zenml.integrations.tensorflow
special
Initialization for TensorFlow integration.
TensorflowIntegration (Integration)
Definition of Tensorflow integration for ZenML.
Source code in zenml/integrations/tensorflow/__init__.py
class TensorflowIntegration(Integration):
"""Definition of Tensorflow integration for ZenML."""
NAME = TENSORFLOW
REQUIREMENTS = ["tensorflow==2.8.0", "tensorflow_io==0.24.0"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
# need to import this explicitly to load the Tensorflow file IO support
# for S3 and other file systems
import tensorflow_io # type: ignore [import]
from zenml.integrations.tensorflow import materializers # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/tensorflow/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
# need to import this explicitly to load the Tensorflow file IO support
# for S3 and other file systems
import tensorflow_io # type: ignore [import]
from zenml.integrations.tensorflow import materializers # noqa
materializers
special
Initialization for the TensorFlow materializers.
keras_materializer
Implementation of the TensorFlow Keras materializer.
KerasMaterializer (BaseMaterializer)
Materializer to read/write Keras models.
Source code in zenml/integrations/tensorflow/materializers/keras_materializer.py
class KerasMaterializer(BaseMaterializer):
"""Materializer to read/write Keras models."""
ASSOCIATED_TYPES = (keras.Model,)
ASSOCIATED_ARTIFACT_TYPES = (ModelArtifact,)
def handle_input(self, data_type: Type[Any]) -> keras.Model:
"""Reads and returns a Keras model after copying it to temporary path.
Args:
data_type: The type of the data to read.
Returns:
A tf.keras.Model model.
"""
super().handle_input(data_type)
# Create a temporary directory to store the model
temp_dir = tempfile.TemporaryDirectory()
# Copy from artifact store to temporary directory
io_utils.copy_dir(self.artifact.uri, temp_dir.name)
# Load the model from the temporary directory
model = keras.models.load_model(temp_dir.name)
# Cleanup and return
fileio.rmtree(temp_dir.name)
return model
def handle_return(self, model: keras.Model) -> None:
"""Writes a keras model to the artifact store.
Args:
model: A tf.keras.Model model.
"""
super().handle_return(model)
# Create a temporary directory to store the model
temp_dir = tempfile.TemporaryDirectory()
model.save(temp_dir.name)
io_utils.copy_dir(temp_dir.name, self.artifact.uri)
# Remove the temporary directory
fileio.rmtree(temp_dir.name)
handle_input(self, data_type)
Reads and returns a Keras model after copying it to temporary path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
Model |
A tf.keras.Model model. |
Source code in zenml/integrations/tensorflow/materializers/keras_materializer.py
def handle_input(self, data_type: Type[Any]) -> keras.Model:
"""Reads and returns a Keras model after copying it to temporary path.
Args:
data_type: The type of the data to read.
Returns:
A tf.keras.Model model.
"""
super().handle_input(data_type)
# Create a temporary directory to store the model
temp_dir = tempfile.TemporaryDirectory()
# Copy from artifact store to temporary directory
io_utils.copy_dir(self.artifact.uri, temp_dir.name)
# Load the model from the temporary directory
model = keras.models.load_model(temp_dir.name)
# Cleanup and return
fileio.rmtree(temp_dir.name)
return model
handle_return(self, model)
Writes a keras model to the artifact store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Model |
A tf.keras.Model model. |
required |
Source code in zenml/integrations/tensorflow/materializers/keras_materializer.py
def handle_return(self, model: keras.Model) -> None:
"""Writes a keras model to the artifact store.
Args:
model: A tf.keras.Model model.
"""
super().handle_return(model)
# Create a temporary directory to store the model
temp_dir = tempfile.TemporaryDirectory()
model.save(temp_dir.name)
io_utils.copy_dir(temp_dir.name, self.artifact.uri)
# Remove the temporary directory
fileio.rmtree(temp_dir.name)
tf_dataset_materializer
Implementation of the TensorFlow dataset materializer.
TensorflowDatasetMaterializer (BaseMaterializer)
Materializer to read data to and from tf.data.Dataset.
Source code in zenml/integrations/tensorflow/materializers/tf_dataset_materializer.py
class TensorflowDatasetMaterializer(BaseMaterializer):
"""Materializer to read data to and from tf.data.Dataset."""
ASSOCIATED_TYPES = (tf.data.Dataset,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads data into tf.data.Dataset.
Args:
data_type: The type of the data to read.
Returns:
A tf.data.Dataset object.
"""
super().handle_input(data_type)
temp_dir = tempfile.mkdtemp()
io_utils.copy_dir(self.artifact.uri, temp_dir)
path = os.path.join(temp_dir, DEFAULT_FILENAME)
dataset = tf.data.experimental.load(path)
# Don't delete the temporary directory here as the dataset is lazily
# loaded and needs to read it when the object gets used
return dataset
def handle_return(self, dataset: tf.data.Dataset) -> None:
"""Persists a tf.data.Dataset object.
Args:
dataset: The dataset to persist.
"""
super().handle_return(dataset)
temp_dir = tempfile.TemporaryDirectory()
path = os.path.join(temp_dir.name, DEFAULT_FILENAME)
try:
tf.data.experimental.save(
dataset, path, compression=None, shard_func=None
)
io_utils.copy_dir(temp_dir.name, self.artifact.uri)
finally:
fileio.rmtree(temp_dir.name)
handle_input(self, data_type)
Reads data into tf.data.Dataset.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[Any] |
The type of the data to read. |
required |
Returns:
Type | Description |
---|---|
Any |
A tf.data.Dataset object. |
Source code in zenml/integrations/tensorflow/materializers/tf_dataset_materializer.py
def handle_input(self, data_type: Type[Any]) -> Any:
"""Reads data into tf.data.Dataset.
Args:
data_type: The type of the data to read.
Returns:
A tf.data.Dataset object.
"""
super().handle_input(data_type)
temp_dir = tempfile.mkdtemp()
io_utils.copy_dir(self.artifact.uri, temp_dir)
path = os.path.join(temp_dir, DEFAULT_FILENAME)
dataset = tf.data.experimental.load(path)
# Don't delete the temporary directory here as the dataset is lazily
# loaded and needs to read it when the object gets used
return dataset
handle_return(self, dataset)
Persists a tf.data.Dataset object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dataset |
DatasetV2 |
The dataset to persist. |
required |
Source code in zenml/integrations/tensorflow/materializers/tf_dataset_materializer.py
def handle_return(self, dataset: tf.data.Dataset) -> None:
"""Persists a tf.data.Dataset object.
Args:
dataset: The dataset to persist.
"""
super().handle_return(dataset)
temp_dir = tempfile.TemporaryDirectory()
path = os.path.join(temp_dir.name, DEFAULT_FILENAME)
try:
tf.data.experimental.save(
dataset, path, compression=None, shard_func=None
)
io_utils.copy_dir(temp_dir.name, self.artifact.uri)
finally:
fileio.rmtree(temp_dir.name)