Orchestrators
zenml.orchestrators
special
Initialization for ZenML orchestrators.
An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.
ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.
base_orchestrator
Base orchestrator class.
BaseOrchestrator (StackComponent, ABC)
pydantic-model
Base class for all orchestrators.
In order to implement an orchestrator you will need to subclass from this class.
How it works:
The run()
method is the entrypoint that is executed when the
pipeline's run method is called within the user code
(pipeline_instance.run()
).
This method will take the ZenML Pipeline instance and prepare it for eventual execution. To do this the following steps are taken:
-
The underlying protobuf pipeline is created.
-
Within the
_configure_node_context()
method the pipeline requirements, stack and runtime configuration is added to the step context -
The
_get_sorted_steps()
method then generates a sorted list of steps which will later be used to directly execute these steps in order, or to easily build a dag -
After these initial steps comes the most crucial one. Within the
prepare_or_run_pipeline()
method each orchestrator will have its own implementation that dictates the pipeline orchestration. In the simplest case this method will iterate through all steps and execute them one by one. In other cases this method will build and deploy an intermediate representation of the pipeline (e.g an airflow dag or a kubeflow pipelines yaml) to be executed within the orchestrators environment.
Building your own:
In order to build your own orchestrator, all you need to do is subclass
from this class and implement your own prepare_or_run_pipeline()
method. Overwriting other methods is NOT recommended but possible.
See the docstring of the prepare_or_run_pipeline()
method to find out
details of what needs to be implemented within it.
Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestrator(StackComponent, ABC):
"""Base class for all orchestrators.
In order to implement an orchestrator you will need to subclass from this
class.
How it works:
-------------
The `run()` method is the entrypoint that is executed when the
pipeline's run method is called within the user code
(`pipeline_instance.run()`).
This method will take the ZenML Pipeline instance and prepare it for
eventual execution. To do this the following steps are taken:
* The underlying protobuf pipeline is created.
* Within the `_configure_node_context()` method the pipeline
requirements, stack and runtime configuration is added to the step
context
* The `_get_sorted_steps()` method then generates a sorted list of
steps which will later be used to directly execute these steps in order,
or to easily build a dag
* After these initial steps comes the most crucial one. Within the
`prepare_or_run_pipeline()` method each orchestrator will have its own
implementation that dictates the pipeline orchestration. In the simplest
case this method will iterate through all steps and execute them one by
one. In other cases this method will build and deploy an intermediate
representation of the pipeline (e.g an airflow dag or a kubeflow
pipelines yaml) to be executed within the orchestrators environment.
Building your own:
------------------
In order to build your own orchestrator, all you need to do is subclass
from this class and implement your own `prepare_or_run_pipeline()`
method. Overwriting other methods is NOT recommended but possible.
See the docstring of the `prepare_or_run_pipeline()` method to find out
details of what needs to be implemented within it.
"""
# Class Configuration
TYPE: ClassVar[StackComponentType] = StackComponentType.ORCHESTRATOR
@abstractmethod
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""This method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly
different operations.
Simple Case:
------------
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through each step and finally call
`self.run_step()` to execute each step.
Advanced Case:
--------------
Most orchestrators will not run the steps directly. Instead, they
build some intermediate representation of the pipeline that is then
used to create and run the pipeline and its steps on the target
environment. For such orchestrators this method will have to build
this representation and either deploy it directly or return it.
Regardless of the implementation details, the orchestrator will need
to a way to trigger each step in the target environment. For this
the `run_step()` method should be used.
In case the orchestrator is using docker containers for orchestration
of each step, the `zenml.entrypoints.step_entrypoint` module can be
used as a generalized entrypoint that sets up all the necessary
prerequisites, parses input parameters and finally executes the step
using the `run_step()`method.
If the orchestrator needs to know the upstream steps for a specific
step to build a DAG, it can use the `get_upstream_step_names()` method
to get them.
Args:
sorted_steps: List of sorted steps.
pipeline: Zenml Pipeline instance.
pb2_pipeline: Protobuf Pipeline instance.
stack: The stack the pipeline was run on.
runtime_configuration: The Runtime configuration of the current run.
Returns:
The optional return value from this method will be returned by the
`pipeline_instance.run()` call when someone is running a pipeline.
"""
def run(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Runs a pipeline.
To do this, a protobuf pipeline is created, the context of the
individual steps is expanded to include relevant data, the steps are
sorted into execution order and the implementation specific
`prepare_or_run_pipeline()` method is called.
Args:
pipeline: The pipeline to run.
stack: The stack on which the pipeline is run.
runtime_configuration: Runtime configuration of the pipeline run.
Returns:
The result of the call to `prepare_or_run_pipeline()`.
"""
# Create the protobuf pipeline which will be needed for various reasons
# in the following steps
pb2_pipeline: Pb2Pipeline = Compiler().compile(
create_tfx_pipeline(pipeline, stack=stack)
)
self._configure_node_context(
pipeline=pipeline,
pb2_pipeline=pb2_pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
sorted_steps = self._get_sorted_steps(
pipeline=pipeline, pb2_pipeline=pb2_pipeline
)
result = self.prepare_or_run_pipeline(
sorted_steps=sorted_steps,
pipeline=pipeline,
pb2_pipeline=pb2_pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
return result
@staticmethod
def _get_sorted_steps(
pipeline: "BasePipeline", pb2_pipeline: Pb2Pipeline
) -> List["BaseStep"]:
"""Get steps sorted in the execution order.
This simplifies the building of a DAG at a later stage as it can be
built with one iteration over this sorted list of steps.
Args:
pipeline: The pipeline
pb2_pipeline: The protobuf pipeline representation
Returns:
List of steps in execution order
"""
# Create a list of sorted steps
sorted_steps = []
for node in pb2_pipeline.nodes:
pipeline_node: PipelineNode = node.pipeline_node
sorted_steps.append(
get_step_for_node(
pipeline_node, steps=list(pipeline.steps.values())
)
)
return sorted_steps
def run_step(
self,
step: "BaseStep",
run_name: str,
pb2_pipeline: Pb2Pipeline,
) -> Optional[data_types.ExecutionInfo]:
"""This sets up a component launcher and executes the given step.
Args:
step: The step to be executed
run_name: The unique run name
pb2_pipeline: Protobuf Pipeline instance
Returns:
The execution info of the step.
"""
# Substitute the runtime parameter to be a concrete run_id, it is
# important for this to be unique for each run.
runtime_parameter_utils.substitute_runtime_parameter(
pb2_pipeline,
{PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
)
# Extract the deployment_configs and use it to access the executor and
# custom driver spec
deployment_config = runner_utils.extract_local_deployment_config(
pb2_pipeline
)
executor_spec = runner_utils.extract_executor_spec(
deployment_config, step.name
)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, step.name
)
# At this point the active metadata store is queried for the
# metadata_connection
repo = Repository()
metadata_store = repo.active_stack.metadata_store
metadata_connection = metadata.Metadata(
metadata_store.get_tfx_metadata_config()
)
custom_executor_operators = {
executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
}
# The protobuf node for the current step is loaded here.
pipeline_node = self._get_node_with_step_name(
step_name=step.name, pb2_pipeline=pb2_pipeline
)
# Create the tfx launcher responsible for executing the step.
component_launcher = launcher.Launcher(
pipeline_node=pipeline_node,
mlmd_connection=metadata_connection,
pipeline_info=pb2_pipeline.pipeline_info,
pipeline_runtime_spec=pb2_pipeline.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
# In some stack configurations, some stack components (like experiment
# trackers) will run some code before and after the actual step run.
# This is where the step actually gets executed using the
# component_launcher
repo.active_stack.prepare_step_run()
execution_info = self._execute_step(component_launcher)
repo.active_stack.cleanup_step_run()
return execution_info
@staticmethod
def _execute_step(
tfx_launcher: launcher.Launcher,
) -> Optional[data_types.ExecutionInfo]:
"""Executes a tfx component.
Args:
tfx_launcher: A tfx launcher to execute the component.
Returns:
Optional execution info returned by the launcher.
Raises:
DuplicateRunNameError: If the run name is already in use.
"""
pipeline_step_name = tfx_launcher._pipeline_node.node_info.id
start_time = time.time()
logger.info(f"Step `{pipeline_step_name}` has started.")
try:
execution_info = tfx_launcher.launch()
if execution_info and get_cache_status(execution_info):
logger.info(f"Using cached version of `{pipeline_step_name}`.")
except RuntimeError as e:
if "execution has already succeeded" in str(e):
# Hacky workaround to catch the error that a pipeline run with
# this name already exists. Raise an error with a more
# descriptive
# message instead.
raise DuplicateRunNameError()
else:
raise
run_duration = time.time() - start_time
logger.info(
f"Step `{pipeline_step_name}` has finished in "
f"{string_utils.get_human_readable_time(run_duration)}."
)
return execution_info
def get_upstream_step_names(
self, step: "BaseStep", pb2_pipeline: Pb2Pipeline
) -> List[str]:
"""Given a step, use the associated pb2 node to find the names of all upstream nodes.
Args:
step: Instance of a Pipeline Step
pb2_pipeline: Protobuf Pipeline instance
Returns:
List of step names from direct upstream steps
"""
node = self._get_node_with_step_name(step.name, pb2_pipeline)
upstream_steps = []
for upstream_node in node.upstream_nodes:
upstream_steps.append(upstream_node)
return upstream_steps
@staticmethod
def _get_node_with_step_name(
step_name: str, pb2_pipeline: Pb2Pipeline
) -> PipelineNode:
"""Given the name of a step, return the node with that name from the pb2_pipeline.
Args:
step_name: Name of the step
pb2_pipeline: pb2 pipeline containing nodes
Returns:
PipelineNode instance
Raises:
KeyError: If the step name is not found in the pipeline.
"""
for node in pb2_pipeline.nodes:
if (
node.WhichOneof("node") == "pipeline_node"
and node.pipeline_node.node_info.id == step_name
):
return node.pipeline_node
raise KeyError(
f"Step {step_name} not found in Pipeline "
f"{pb2_pipeline.pipeline_info.id}"
)
@staticmethod
def _configure_node_context(
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Iterates through each node of a pb2_pipeline.
This attaches important contexts to the nodes; namely
pipeline.requirements, stack information and the runtime configuration.
Args:
pipeline: Zenml Pipeline instance
pb2_pipeline: Protobuf Pipeline instance
stack: The stack the pipeline was run on
runtime_configuration: The Runtime configuration of the current run
"""
for node in pb2_pipeline.nodes:
pipeline_node: PipelineNode = node.pipeline_node
# Add pipeline requirements to the step context
requirements = " ".join(sorted(pipeline.requirements))
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.PIPELINE_REQUIREMENTS.value,
name=str(hash(requirements)),
properties={"pipeline_requirements": requirements},
)
# Add the zenml stack to the step context
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.STACK.value,
name=str(hash(json.dumps(stack.dict(), sort_keys=True))),
properties=stack.dict(),
)
# Add all Pydantic objects from runtime_configuration to the context
context_utils.add_runtime_configuration_to_node(
pipeline_node, runtime_configuration
)
get_upstream_step_names(self, step, pb2_pipeline)
Given a step, use the associated pb2 node to find the names of all upstream nodes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
BaseStep |
Instance of a Pipeline Step |
required |
pb2_pipeline |
Pipeline |
Protobuf Pipeline instance |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of step names from direct upstream steps |
Source code in zenml/orchestrators/base_orchestrator.py
def get_upstream_step_names(
self, step: "BaseStep", pb2_pipeline: Pb2Pipeline
) -> List[str]:
"""Given a step, use the associated pb2 node to find the names of all upstream nodes.
Args:
step: Instance of a Pipeline Step
pb2_pipeline: Protobuf Pipeline instance
Returns:
List of step names from direct upstream steps
"""
node = self._get_node_with_step_name(step.name, pb2_pipeline)
upstream_steps = []
for upstream_node in node.upstream_nodes:
upstream_steps.append(upstream_node)
return upstream_steps
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)
This method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly different operations.
Simple Case:
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through each step and finally call
self.run_step()
to execute each step.
Advanced Case:
Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and either deploy it directly or return it.
Regardless of the implementation details, the orchestrator will need
to a way to trigger each step in the target environment. For this
the run_step()
method should be used.
In case the orchestrator is using docker containers for orchestration
of each step, the zenml.entrypoints.step_entrypoint
module can be
used as a generalized entrypoint that sets up all the necessary
prerequisites, parses input parameters and finally executes the step
using the run_step()
method.
If the orchestrator needs to know the upstream steps for a specific
step to build a DAG, it can use the get_upstream_step_names()
method
to get them.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sorted_steps |
List[zenml.steps.base_step.BaseStep] |
List of sorted steps. |
required |
pipeline |
BasePipeline |
Zenml Pipeline instance. |
required |
pb2_pipeline |
Pipeline |
Protobuf Pipeline instance. |
required |
stack |
Stack |
The stack the pipeline was run on. |
required |
runtime_configuration |
RuntimeConfiguration |
The Runtime configuration of the current run. |
required |
Returns:
Type | Description |
---|---|
Any |
The optional return value from this method will be returned by the
|
Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""This method needs to be implemented by the respective orchestrator.
Depending on the type of orchestrator you'll have to perform slightly
different operations.
Simple Case:
------------
The Steps are run directly from within the same environment in which
the orchestrator code is executed. In this case you will need to
deal with implementation-specific runtime configurations (like the
schedule) and then iterate through each step and finally call
`self.run_step()` to execute each step.
Advanced Case:
--------------
Most orchestrators will not run the steps directly. Instead, they
build some intermediate representation of the pipeline that is then
used to create and run the pipeline and its steps on the target
environment. For such orchestrators this method will have to build
this representation and either deploy it directly or return it.
Regardless of the implementation details, the orchestrator will need
to a way to trigger each step in the target environment. For this
the `run_step()` method should be used.
In case the orchestrator is using docker containers for orchestration
of each step, the `zenml.entrypoints.step_entrypoint` module can be
used as a generalized entrypoint that sets up all the necessary
prerequisites, parses input parameters and finally executes the step
using the `run_step()`method.
If the orchestrator needs to know the upstream steps for a specific
step to build a DAG, it can use the `get_upstream_step_names()` method
to get them.
Args:
sorted_steps: List of sorted steps.
pipeline: Zenml Pipeline instance.
pb2_pipeline: Protobuf Pipeline instance.
stack: The stack the pipeline was run on.
runtime_configuration: The Runtime configuration of the current run.
Returns:
The optional return value from this method will be returned by the
`pipeline_instance.run()` call when someone is running a pipeline.
"""
run(self, pipeline, stack, runtime_configuration)
Runs a pipeline.
To do this, a protobuf pipeline is created, the context of the
individual steps is expanded to include relevant data, the steps are
sorted into execution order and the implementation specific
prepare_or_run_pipeline()
method is called.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
The pipeline to run. |
required |
stack |
Stack |
The stack on which the pipeline is run. |
required |
runtime_configuration |
RuntimeConfiguration |
Runtime configuration of the pipeline run. |
required |
Returns:
Type | Description |
---|---|
Any |
The result of the call to |
Source code in zenml/orchestrators/base_orchestrator.py
def run(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Runs a pipeline.
To do this, a protobuf pipeline is created, the context of the
individual steps is expanded to include relevant data, the steps are
sorted into execution order and the implementation specific
`prepare_or_run_pipeline()` method is called.
Args:
pipeline: The pipeline to run.
stack: The stack on which the pipeline is run.
runtime_configuration: Runtime configuration of the pipeline run.
Returns:
The result of the call to `prepare_or_run_pipeline()`.
"""
# Create the protobuf pipeline which will be needed for various reasons
# in the following steps
pb2_pipeline: Pb2Pipeline = Compiler().compile(
create_tfx_pipeline(pipeline, stack=stack)
)
self._configure_node_context(
pipeline=pipeline,
pb2_pipeline=pb2_pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
sorted_steps = self._get_sorted_steps(
pipeline=pipeline, pb2_pipeline=pb2_pipeline
)
result = self.prepare_or_run_pipeline(
sorted_steps=sorted_steps,
pipeline=pipeline,
pb2_pipeline=pb2_pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
return result
run_step(self, step, run_name, pb2_pipeline)
This sets up a component launcher and executes the given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
BaseStep |
The step to be executed |
required |
run_name |
str |
The unique run name |
required |
pb2_pipeline |
Pipeline |
Protobuf Pipeline instance |
required |
Returns:
Type | Description |
---|---|
Optional[tfx.orchestration.portable.data_types.ExecutionInfo] |
The execution info of the step. |
Source code in zenml/orchestrators/base_orchestrator.py
def run_step(
self,
step: "BaseStep",
run_name: str,
pb2_pipeline: Pb2Pipeline,
) -> Optional[data_types.ExecutionInfo]:
"""This sets up a component launcher and executes the given step.
Args:
step: The step to be executed
run_name: The unique run name
pb2_pipeline: Protobuf Pipeline instance
Returns:
The execution info of the step.
"""
# Substitute the runtime parameter to be a concrete run_id, it is
# important for this to be unique for each run.
runtime_parameter_utils.substitute_runtime_parameter(
pb2_pipeline,
{PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
)
# Extract the deployment_configs and use it to access the executor and
# custom driver spec
deployment_config = runner_utils.extract_local_deployment_config(
pb2_pipeline
)
executor_spec = runner_utils.extract_executor_spec(
deployment_config, step.name
)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, step.name
)
# At this point the active metadata store is queried for the
# metadata_connection
repo = Repository()
metadata_store = repo.active_stack.metadata_store
metadata_connection = metadata.Metadata(
metadata_store.get_tfx_metadata_config()
)
custom_executor_operators = {
executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
}
# The protobuf node for the current step is loaded here.
pipeline_node = self._get_node_with_step_name(
step_name=step.name, pb2_pipeline=pb2_pipeline
)
# Create the tfx launcher responsible for executing the step.
component_launcher = launcher.Launcher(
pipeline_node=pipeline_node,
mlmd_connection=metadata_connection,
pipeline_info=pb2_pipeline.pipeline_info,
pipeline_runtime_spec=pb2_pipeline.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
# In some stack configurations, some stack components (like experiment
# trackers) will run some code before and after the actual step run.
# This is where the step actually gets executed using the
# component_launcher
repo.active_stack.prepare_step_run()
execution_info = self._execute_step(component_launcher)
repo.active_stack.cleanup_step_run()
return execution_info
context_utils
Utilities for the local orchestrator to help with contexts.
add_context_to_node(pipeline_node, type_, name, properties)
Adds a new context to a TFX protobuf pipeline node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_node |
pipeline_pb2.PipelineNode |
A tfx protobuf pipeline node |
required |
type_ |
str |
The type name for the context to be added |
required |
name |
str |
Unique key for the context |
required |
properties |
Dict[str, str] |
dictionary of strings as properties of the context |
required |
Source code in zenml/orchestrators/context_utils.py
def add_context_to_node(
pipeline_node: "pipeline_pb2.PipelineNode",
type_: str,
name: str,
properties: Dict[str, str],
) -> None:
"""Adds a new context to a TFX protobuf pipeline node.
Args:
pipeline_node: A tfx protobuf pipeline node
type_: The type name for the context to be added
name: Unique key for the context
properties: dictionary of strings as properties of the context
"""
# Add a new context to the pipeline
context: "pipeline_pb2.ContextSpec" = pipeline_node.contexts.contexts.add()
# Adding the type of context
context.type.name = type_
# Setting the name of the context
context.name.field_value.string_value = name
# Setting the properties of the context depending on attribute type
for key, value in properties.items():
c_property = context.properties[key]
c_property.field_value.string_value = value
add_runtime_configuration_to_node(pipeline_node, runtime_config)
Add the runtime configuration of a pipeline run to a protobuf pipeline node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_node |
pipeline_pb2.PipelineNode |
a tfx protobuf pipeline node |
required |
runtime_config |
RuntimeConfiguration |
a ZenML RuntimeConfiguration |
required |
Source code in zenml/orchestrators/context_utils.py
def add_runtime_configuration_to_node(
pipeline_node: "pipeline_pb2.PipelineNode",
runtime_config: RuntimeConfiguration,
) -> None:
"""Add the runtime configuration of a pipeline run to a protobuf pipeline node.
Args:
pipeline_node: a tfx protobuf pipeline node
runtime_config: a ZenML RuntimeConfiguration
"""
skip_errors: bool = runtime_config.get(
"ignore_unserializable_fields", False
)
# Determine the name of the context
def _name(obj: "BaseModel") -> str:
"""Compute a unique context name for a pydantic BaseModel.
Args:
obj: a pydantic BaseModel
Returns:
a unique context name
"""
try:
return str(hash(obj.json(sort_keys=True)))
except TypeError as e:
class_name = obj.__class__.__name__
logging.info(
"Cannot convert %s to json, generating uuid instead. Error: %s",
class_name,
e,
)
return f"{class_name}_{uuid.uuid1()}"
# iterate over all attributes of runtime context, serializing all pydantic
# objects to node context.
for key, obj in runtime_config.items():
if isinstance(obj, BaseModel):
logger.debug("Adding %s to context", key)
add_context_to_node(
pipeline_node,
type_=obj.__repr_name__().lower(),
name=_name(obj),
properties=serialize_pydantic_object(
obj, skip_errors=skip_errors
),
)
serialize_pydantic_object(obj, *, skip_errors=False)
Convert a pydantic object to a dict of strings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
BaseModel |
a pydantic object. |
required |
skip_errors |
bool |
if True, ignore errors when serializing the object. |
False |
Returns:
Type | Description |
---|---|
Dict[str, str] |
a dictionary of strings. |
Source code in zenml/orchestrators/context_utils.py
def serialize_pydantic_object(
obj: BaseModel, *, skip_errors: bool = False
) -> Dict[str, str]:
"""Convert a pydantic object to a dict of strings.
Args:
obj: a pydantic object.
skip_errors: if True, ignore errors when serializing the object.
Returns:
a dictionary of strings.
"""
class PydanticEncoder(json.JSONEncoder):
def default(self, o: Any) -> Any:
"""Default encoding for pydantic objects.
Args:
o: the object to encode.
Returns:
the encoded object.
"""
try:
return cast(Callable[[Any], str], obj.__json_encoder__)(o)
except TypeError:
return super().default(o)
def _inner_generator(
dictionary: Dict[str, Any]
) -> Iterator[Tuple[str, str]]:
"""Itemwise serialize each element in a dictionary.
Args:
dictionary: a dictionary.
Yields:
a tuple of (key, value).
Raises:
TypeError: if the value is not JSON serializable
"""
for key, item in dictionary.items():
try:
yield key, json.dumps(item, cls=PydanticEncoder)
except TypeError as e:
if skip_errors:
logging.info(
"Skipping adding field '%s' to metadata context as "
"it cannot be serialized due to %s.",
key,
e,
)
else:
raise TypeError(
f"Invalid type {type(item)} for key {key} can not be "
"serialized."
) from e
return {key: value for key, value in _inner_generator(obj.dict())}
local
special
Initialization for the local orchestrator.
local_orchestrator
Implementation of the ZenML local orchestrator.
LocalOrchestrator (BaseOrchestrator)
pydantic-model
Orchestrator responsible for running pipelines locally.
This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.
Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines locally.
This orchestrator does not allow for concurrent execution of steps and also
does not support running on a schedule.
"""
FLAVOR: ClassVar[str] = "local"
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""This method iterates through all steps and executes them sequentially.
Args:
sorted_steps: A list of steps in the pipeline.
pipeline: The pipeline object.
pb2_pipeline: The pipeline object in protobuf format.
stack: The stack object.
runtime_configuration: The runtime configuration object.
"""
if runtime_configuration.schedule:
logger.warning(
"Local Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
assert runtime_configuration.run_name, "Run name must be set"
# Run each step
for step in sorted_steps:
self.run_step(
step=step,
run_name=runtime_configuration.run_name,
pb2_pipeline=pb2_pipeline,
)
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)
This method iterates through all steps and executes them sequentially.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sorted_steps |
List[zenml.steps.base_step.BaseStep] |
A list of steps in the pipeline. |
required |
pipeline |
BasePipeline |
The pipeline object. |
required |
pb2_pipeline |
Pipeline |
The pipeline object in protobuf format. |
required |
stack |
Stack |
The stack object. |
required |
runtime_configuration |
RuntimeConfiguration |
The runtime configuration object. |
required |
Source code in zenml/orchestrators/local/local_orchestrator.py
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""This method iterates through all steps and executes them sequentially.
Args:
sorted_steps: A list of steps in the pipeline.
pipeline: The pipeline object.
pb2_pipeline: The pipeline object in protobuf format.
stack: The stack object.
runtime_configuration: The runtime configuration object.
"""
if runtime_configuration.schedule:
logger.warning(
"Local Orchestrator currently does not support the"
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
assert runtime_configuration.run_name, "Run name must be set"
# Run each step
for step in sorted_steps:
self.run_step(
step=step,
run_name=runtime_configuration.run_name,
pb2_pipeline=pb2_pipeline,
)
utils
Utility functions for the orchestrator.
create_tfx_pipeline(zenml_pipeline, stack)
Creates a tfx pipeline from a ZenML pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
zenml_pipeline |
BasePipeline |
The ZenML pipeline. |
required |
stack |
Stack |
The stack. |
required |
Returns:
Type | Description |
---|---|
Pipeline |
The tfx pipeline. |
Source code in zenml/orchestrators/utils.py
def create_tfx_pipeline(
zenml_pipeline: "BasePipeline", stack: "Stack"
) -> tfx_pipeline.Pipeline:
"""Creates a tfx pipeline from a ZenML pipeline.
Args:
zenml_pipeline: The ZenML pipeline.
stack: The stack.
Returns:
The tfx pipeline.
"""
# Connect the inputs/outputs of all steps in the pipeline
zenml_pipeline.connect(**zenml_pipeline.steps)
tfx_components = [step.component for step in zenml_pipeline.steps.values()]
artifact_store = stack.artifact_store
# We do not pass the metadata connection config here as it might not be
# accessible. Instead it is queried from the active stack right before a
# step is executed (see `BaseOrchestrator.run_step(...)`)
return tfx_pipeline.Pipeline(
pipeline_name=zenml_pipeline.name,
components=tfx_components, # type: ignore[arg-type]
pipeline_root=artifact_store.path,
enable_cache=zenml_pipeline.enable_cache,
)
get_cache_status(execution_info)
Returns whether a cached execution was used or not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
execution_info |
Optional[tfx.orchestration.portable.data_types.ExecutionInfo] |
The execution info. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/orchestrators/utils.py
def get_cache_status(
execution_info: Optional[data_types.ExecutionInfo],
) -> bool:
"""Returns whether a cached execution was used or not.
Args:
execution_info: The execution info.
Returns:
`True` if the execution was cached, `False` otherwise.
"""
# An execution output URI is only provided if the step needs to be
# executed (= is not cached)
if execution_info and execution_info.execution_output_uri is None:
return True
else:
return False
get_step_for_node(node, steps)
Finds the matching step for a tfx pipeline node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
node |
PipelineNode |
The tfx pipeline node. |
required |
steps |
List[zenml.steps.base_step.BaseStep] |
The list of steps. |
required |
Returns:
Type | Description |
---|---|
BaseStep |
The matching step. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If no matching step is found. |
Source code in zenml/orchestrators/utils.py
def get_step_for_node(node: PipelineNode, steps: List[BaseStep]) -> BaseStep:
"""Finds the matching step for a tfx pipeline node.
Args:
node: The tfx pipeline node.
steps: The list of steps.
Returns:
The matching step.
Raises:
RuntimeError: If no matching step is found.
"""
step_name = node.node_info.id
try:
return next(step for step in steps if step.name == step_name)
except StopIteration:
raise RuntimeError(f"Unable to find step with name '{step_name}'.")