Skip to content

Orchestrators

zenml.orchestrators special

An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.

ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.

base_orchestrator

BaseOrchestrator (StackComponent, ABC) pydantic-model

Base class for all orchestrators. In order to implement an orchestrator you will need to subclass from this class.

How it works:

The run() method is the entrypoint that is executed when the pipeline's run method is called within the user code (pipeline_instance.run()).

This method will take the ZenML Pipeline instance and prepare it for eventual execution. To do this the following steps are taken:

  • The underlying protobuf pipeline is created.

  • Within the _configure_node_context() method the pipeline requirements, stack and runtime configuration is added to the step context

  • The _get_sorted_steps() method then generates a sorted list of steps which will later be used to directly execute these steps in order, or to easily build a dag

  • After these initial steps comes the most crucial one. Within the prepare_or_run_pipeline() method each orchestrator will have its own implementation that dictates the pipeline orchestration. In the simplest case this method will iterate through all steps and execute them one by one. In other cases this method will build and deploy an intermediate representation of the pipeline (e.g an airflow dag or a kubeflow pipelines yaml) to be executed within the orchestrators environment.

Building your own:

In order to build your own orchestrator, all you need to do is subclass from this class and implement your own prepare_or_run_pipeline() method. Overwriting other methods is NOT recommended but possible. See the docstring of the prepare_or_run_pipeline() method to find out details of what needs to be implemented within it.

Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestrator(StackComponent, ABC):
    """
    Base class for all orchestrators. In order to implement an
    orchestrator you will need to subclass from this class.

    How it works:
    -------------
    The `run()` method is the entrypoint that is executed when the
    pipeline's run method is called within the user code
    (`pipeline_instance.run()`).

    This method will take the ZenML Pipeline instance and prepare it for
    eventual execution. To do this the following steps are taken:

    * The underlying protobuf pipeline is created.

    * Within the `_configure_node_context()` method the pipeline
    requirements, stack and runtime configuration is added to the step
    context

    * The `_get_sorted_steps()` method then generates a sorted list of
    steps which will later be used to directly execute these steps in order,
    or to easily build a dag

    * After these initial steps comes the most crucial one. Within the
    `prepare_or_run_pipeline()` method each orchestrator will have its own
    implementation that dictates the pipeline orchestration. In the simplest
    case this method will iterate through all steps and execute them one by
    one. In other cases this method will build and deploy an intermediate
    representation of the pipeline (e.g an airflow dag or a kubeflow
    pipelines yaml) to be executed within the orchestrators environment.

    Building your own:
    ------------------
    In order to build your own orchestrator, all you need to do is subclass
    from this class and implement your own `prepare_or_run_pipeline()`
    method. Overwriting other methods is NOT recommended but possible.
    See the docstring of the `prepare_or_run_pipeline()` method to find out
    details of what needs to be implemented within it.
    """

    # Class Configuration
    TYPE: ClassVar[StackComponentType] = StackComponentType.ORCHESTRATOR

    @abstractmethod
    def prepare_or_run_pipeline(
        self,
        sorted_steps: List[BaseStep],
        pipeline: "BasePipeline",
        pb2_pipeline: Pb2Pipeline,
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> Any:
        """
         This method needs to be implemented by the respective orchestrator.
         Depending on the type of orchestrator you'll have to perform slightly
         different operations.

         Simple Case:
         ------------
         The Steps are run directly from within the same environment in which
         the orchestrator code is executed. In this case you will need to
         deal with implementation-specific runtime configurations (like the
         schedule) and then iterate through each step and finally call
         `self.run_step()` to execute each step.

         Advanced Case:
         --------------
         Most orchestrators will not run the steps directly. Instead, they
         build some intermediate representation of the pipeline that is then
         used to create and run the pipeline and its steps on the target
         environment. For such orchestrators this method will have to build
         this representation and either deploy it directly or return it.

         Regardless of the implementation details, the orchestrator will need
         to a way to trigger each step in the target environment. For this
         the `run_step()` method should be used.

         In case the orchestrator is using docker containers for orchestration
         of each step, the `zenml.entrypoints.step_entrypoint` module can be
         used as a generalized entrypoint that sets up all the necessary
         prerequisites, parses input parameters and finally executes the step
         using the `run_step()`method.

         If the orchestrator needs to know the upstream steps for a specific
         step to build a DAG, it can use the `get_upstream_step_names()` method
         to get them.

         Args:
             sorted_steps: List of sorted steps
             pipeline: Zenml Pipeline instance
             pb2_pipeline: Protobuf Pipeline instance
             stack: The stack the pipeline was run on
             runtime_configuration: The Runtime configuration of the current run

        Returns:
            The optional return value from this method will be returned by the
            `pipeline_instance.run()` call when someone is running a pipeline.
        """

    def run(
        self,
        pipeline: "BasePipeline",
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> Any:
        """Runs a pipeline. To do this, a protobuf pipeline is created, the
        context of the individual steps is expanded to include relevant data,
        the steps are sorted into execution order and the implementation
        specific `prepare_or_run_pipeline()` method is called.

        Args:
            pipeline: The pipeline to run.
            stack: The stack on which the pipeline is run.
            runtime_configuration: Runtime configuration of the pipeline run.

        Return:
            The result of the call to `prepare_or_run_pipeline()`.
        """

        # Create the protobuf pipeline which will be needed for various reasons
        # in the following steps
        pb2_pipeline: Pb2Pipeline = Compiler().compile(
            create_tfx_pipeline(pipeline, stack=stack)
        )

        self._configure_node_context(
            pipeline=pipeline,
            pb2_pipeline=pb2_pipeline,
            stack=stack,
            runtime_configuration=runtime_configuration,
        )

        sorted_steps = self._get_sorted_steps(
            pipeline=pipeline, pb2_pipeline=pb2_pipeline
        )

        result = self.prepare_or_run_pipeline(
            sorted_steps=sorted_steps,
            pipeline=pipeline,
            pb2_pipeline=pb2_pipeline,
            stack=stack,
            runtime_configuration=runtime_configuration,
        )

        return result

    @staticmethod
    def _get_sorted_steps(
        pipeline: "BasePipeline", pb2_pipeline: Pb2Pipeline
    ) -> List["BaseStep"]:
        """Get steps sorted in the execution order. This simplifies the
        building of a DAG at a later stage as it can be built with one iteration
        over this sorted list of steps.

        Args:
            pipeline: The pipeline
            pb2_pipeline: The protobuf pipeline representation

        Returns:
            List of steps in execution order
        """
        # Create a list of sorted steps
        sorted_steps = []
        for node in pb2_pipeline.nodes:
            pipeline_node: PipelineNode = node.pipeline_node
            sorted_steps.append(
                get_step_for_node(
                    pipeline_node, steps=list(pipeline.steps.values())
                )
            )
        return sorted_steps

    def run_step(
        self,
        step: "BaseStep",
        run_name: str,
        pb2_pipeline: Pb2Pipeline,
    ) -> Optional[data_types.ExecutionInfo]:
        """This sets up a component launcher and executes the given step.

        Args:
            step: The step to be executed
            run_name: The unique run name
            pb2_pipeline: Protobuf Pipeline instance
        """
        # Substitute the runtime parameter to be a concrete run_id, it is
        # important for this to be unique for each run.
        runtime_parameter_utils.substitute_runtime_parameter(
            pb2_pipeline,
            {PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
        )

        # Extract the deployment_configs and use it to access the executor and
        # custom driver spec
        deployment_config = runner_utils.extract_local_deployment_config(
            pb2_pipeline
        )
        executor_spec = runner_utils.extract_executor_spec(
            deployment_config, step.name
        )
        custom_driver_spec = runner_utils.extract_custom_driver_spec(
            deployment_config, step.name
        )

        # At this point the active metadata store is queried for the
        # metadata_connection
        repo = Repository()
        metadata_store = repo.active_stack.metadata_store
        metadata_connection = metadata.Metadata(
            metadata_store.get_tfx_metadata_config()
        )
        custom_executor_operators = {
            executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
        }

        # The protobuf node for the current step is loaded here.
        pipeline_node = self._get_node_with_step_name(
            step_name=step.name, pb2_pipeline=pb2_pipeline
        )

        # Create the tfx launcher responsible for executing the step.
        component_launcher = launcher.Launcher(
            pipeline_node=pipeline_node,
            mlmd_connection=metadata_connection,
            pipeline_info=pb2_pipeline.pipeline_info,
            pipeline_runtime_spec=pb2_pipeline.runtime_spec,
            executor_spec=executor_spec,
            custom_driver_spec=custom_driver_spec,
            custom_executor_operators=custom_executor_operators,
        )

        # In some stack configurations, some stack components (like experiment
        # trackers) will run some code before and after the actual step run.
        # This is where the step actually gets executed using the
        # component_launcher
        repo.active_stack.prepare_step_run()
        execution_info = self._execute_step(component_launcher)
        repo.active_stack.cleanup_step_run()

        return execution_info

    @staticmethod
    def _execute_step(
        tfx_launcher: launcher.Launcher,
    ) -> Optional[data_types.ExecutionInfo]:
        """Executes a tfx component.

        Args:
            tfx_launcher: A tfx launcher to execute the component.

        Returns:
            Optional execution info returned by the launcher.
        """
        step_name_param = (
            INTERNAL_EXECUTION_PARAMETER_PREFIX + PARAM_PIPELINE_PARAMETER_NAME
        )
        pipeline_step_name = tfx_launcher._pipeline_node.node_info.id
        start_time = time.time()
        logger.info(f"Step `{pipeline_step_name}` has started.")
        try:
            execution_info = tfx_launcher.launch()
            if execution_info and get_cache_status(execution_info):
                if execution_info.exec_properties:
                    step_name = json.loads(
                        execution_info.exec_properties[step_name_param]
                    )
                    logger.info(
                        f"Using cached version of `{pipeline_step_name}` "
                        f"[`{step_name}`].",
                    )
                else:
                    logger.error(
                        f"No execution properties found for step "
                        f"`{pipeline_step_name}`."
                    )
        except RuntimeError as e:
            if "execution has already succeeded" in str(e):
                # Hacky workaround to catch the error that a pipeline run with
                # this name already exists. Raise an error with a more
                # descriptive
                # message instead.
                raise DuplicateRunNameError()
            else:
                raise

        run_duration = time.time() - start_time
        logger.info(
            f"Step `{pipeline_step_name}` has finished in "
            f"{string_utils.get_human_readable_time(run_duration)}."
        )
        return execution_info

    def get_upstream_step_names(
        self, step: "BaseStep", pb2_pipeline: Pb2Pipeline
    ) -> List[str]:
        """Given a step, use the associated pb2 node to find the names of all
        upstream nodes.

        Args:
            step: Instance of a Pipeline Step
            pb2_pipeline: Protobuf Pipeline instance

        Returns:
            List of step names from direct upstream steps
        """
        node = self._get_node_with_step_name(step.name, pb2_pipeline)

        upstream_steps = []
        for upstream_node in node.upstream_nodes:
            upstream_steps.append(upstream_node)

        return upstream_steps

    @staticmethod
    def _get_node_with_step_name(
        step_name: str, pb2_pipeline: Pb2Pipeline
    ) -> PipelineNode:
        """Given the name of a step, return the node with that name from the
        pb2_pipeline.

        Args:
            step_name: Name of the step
            pb2_pipeline: pb2 pipeline containing nodes

        Returns:
            PipelineNode instance
        """
        for node in pb2_pipeline.nodes:
            if (
                node.WhichOneof("node") == "pipeline_node"
                and node.pipeline_node.node_info.id == step_name
            ):
                return node.pipeline_node

        raise KeyError(
            f"Step {step_name} not found in Pipeline "
            f"{pb2_pipeline.pipeline_info.id}"
        )

    @staticmethod
    def _configure_node_context(
        pipeline: "BasePipeline",
        pb2_pipeline: Pb2Pipeline,
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> None:
        """Iterates through each node of a pb2_pipeline and attaches important
        contexts to the nodes; namely pipeline.requirements, stack
        information and the runtime configuration.

        Args:
            pipeline: Zenml Pipeline instance
            pb2_pipeline: Protobuf Pipeline instance
            stack: The stack the pipeline was run on
            runtime_configuration: The Runtime configuration of the current run
        """
        for node in pb2_pipeline.nodes:
            pipeline_node: PipelineNode = node.pipeline_node

            # Add pipeline requirements to the step context
            requirements = " ".join(sorted(pipeline.requirements))
            context_utils.add_context_to_node(
                pipeline_node,
                type_=MetadataContextTypes.PIPELINE_REQUIREMENTS.value,
                name=str(hash(requirements)),
                properties={"pipeline_requirements": requirements},
            )

            # Add the zenml stack to the step context
            context_utils.add_context_to_node(
                pipeline_node,
                type_=MetadataContextTypes.STACK.value,
                name=str(hash(json.dumps(stack.dict(), sort_keys=True))),
                properties=stack.dict(),
            )

            # Add all pydantic objects from runtime_configuration to the context
            context_utils.add_runtime_configuration_to_node(
                pipeline_node, runtime_configuration
            )
get_upstream_step_names(self, step, pb2_pipeline)

Given a step, use the associated pb2 node to find the names of all upstream nodes.

Parameters:

Name Type Description Default
step BaseStep

Instance of a Pipeline Step

required
pb2_pipeline Pipeline

Protobuf Pipeline instance

required

Returns:

Type Description
List[str]

List of step names from direct upstream steps

Source code in zenml/orchestrators/base_orchestrator.py
def get_upstream_step_names(
    self, step: "BaseStep", pb2_pipeline: Pb2Pipeline
) -> List[str]:
    """Given a step, use the associated pb2 node to find the names of all
    upstream nodes.

    Args:
        step: Instance of a Pipeline Step
        pb2_pipeline: Protobuf Pipeline instance

    Returns:
        List of step names from direct upstream steps
    """
    node = self._get_node_with_step_name(step.name, pb2_pipeline)

    upstream_steps = []
    for upstream_node in node.upstream_nodes:
        upstream_steps.append(upstream_node)

    return upstream_steps
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)

This method needs to be implemented by the respective orchestrator. Depending on the type of orchestrator you'll have to perform slightly different operations.

Simple Case:


The Steps are run directly from within the same environment in which the orchestrator code is executed. In this case you will need to deal with implementation-specific runtime configurations (like the schedule) and then iterate through each step and finally call self.run_step() to execute each step.

Advanced Case:


Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and either deploy it directly or return it.

Regardless of the implementation details, the orchestrator will need to a way to trigger each step in the target environment. For this the run_step() method should be used.

In case the orchestrator is using docker containers for orchestration of each step, the zenml.entrypoints.step_entrypoint module can be used as a generalized entrypoint that sets up all the necessary prerequisites, parses input parameters and finally executes the step using the run_step()method.

If the orchestrator needs to know the upstream steps for a specific step to build a DAG, it can use the get_upstream_step_names() method to get them.

!!! args sorted_steps: List of sorted steps pipeline: Zenml Pipeline instance pb2_pipeline: Protobuf Pipeline instance stack: The stack the pipeline was run on runtime_configuration: The Runtime configuration of the current run

Returns:

Type Description
Any

The optional return value from this method will be returned by the pipeline_instance.run() call when someone is running a pipeline.

Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def prepare_or_run_pipeline(
    self,
    sorted_steps: List[BaseStep],
    pipeline: "BasePipeline",
    pb2_pipeline: Pb2Pipeline,
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> Any:
    """
     This method needs to be implemented by the respective orchestrator.
     Depending on the type of orchestrator you'll have to perform slightly
     different operations.

     Simple Case:
     ------------
     The Steps are run directly from within the same environment in which
     the orchestrator code is executed. In this case you will need to
     deal with implementation-specific runtime configurations (like the
     schedule) and then iterate through each step and finally call
     `self.run_step()` to execute each step.

     Advanced Case:
     --------------
     Most orchestrators will not run the steps directly. Instead, they
     build some intermediate representation of the pipeline that is then
     used to create and run the pipeline and its steps on the target
     environment. For such orchestrators this method will have to build
     this representation and either deploy it directly or return it.

     Regardless of the implementation details, the orchestrator will need
     to a way to trigger each step in the target environment. For this
     the `run_step()` method should be used.

     In case the orchestrator is using docker containers for orchestration
     of each step, the `zenml.entrypoints.step_entrypoint` module can be
     used as a generalized entrypoint that sets up all the necessary
     prerequisites, parses input parameters and finally executes the step
     using the `run_step()`method.

     If the orchestrator needs to know the upstream steps for a specific
     step to build a DAG, it can use the `get_upstream_step_names()` method
     to get them.

     Args:
         sorted_steps: List of sorted steps
         pipeline: Zenml Pipeline instance
         pb2_pipeline: Protobuf Pipeline instance
         stack: The stack the pipeline was run on
         runtime_configuration: The Runtime configuration of the current run

    Returns:
        The optional return value from this method will be returned by the
        `pipeline_instance.run()` call when someone is running a pipeline.
    """
run(self, pipeline, stack, runtime_configuration)

Runs a pipeline. To do this, a protobuf pipeline is created, the context of the individual steps is expanded to include relevant data, the steps are sorted into execution order and the implementation specific prepare_or_run_pipeline() method is called.

Parameters:

Name Type Description Default
pipeline BasePipeline

The pipeline to run.

required
stack Stack

The stack on which the pipeline is run.

required
runtime_configuration RuntimeConfiguration

Runtime configuration of the pipeline run.

required

Returns:

Type Description
Any

The result of the call to prepare_or_run_pipeline().

Source code in zenml/orchestrators/base_orchestrator.py
def run(
    self,
    pipeline: "BasePipeline",
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> Any:
    """Runs a pipeline. To do this, a protobuf pipeline is created, the
    context of the individual steps is expanded to include relevant data,
    the steps are sorted into execution order and the implementation
    specific `prepare_or_run_pipeline()` method is called.

    Args:
        pipeline: The pipeline to run.
        stack: The stack on which the pipeline is run.
        runtime_configuration: Runtime configuration of the pipeline run.

    Return:
        The result of the call to `prepare_or_run_pipeline()`.
    """

    # Create the protobuf pipeline which will be needed for various reasons
    # in the following steps
    pb2_pipeline: Pb2Pipeline = Compiler().compile(
        create_tfx_pipeline(pipeline, stack=stack)
    )

    self._configure_node_context(
        pipeline=pipeline,
        pb2_pipeline=pb2_pipeline,
        stack=stack,
        runtime_configuration=runtime_configuration,
    )

    sorted_steps = self._get_sorted_steps(
        pipeline=pipeline, pb2_pipeline=pb2_pipeline
    )

    result = self.prepare_or_run_pipeline(
        sorted_steps=sorted_steps,
        pipeline=pipeline,
        pb2_pipeline=pb2_pipeline,
        stack=stack,
        runtime_configuration=runtime_configuration,
    )

    return result
run_step(self, step, run_name, pb2_pipeline)

This sets up a component launcher and executes the given step.

Parameters:

Name Type Description Default
step BaseStep

The step to be executed

required
run_name str

The unique run name

required
pb2_pipeline Pipeline

Protobuf Pipeline instance

required
Source code in zenml/orchestrators/base_orchestrator.py
def run_step(
    self,
    step: "BaseStep",
    run_name: str,
    pb2_pipeline: Pb2Pipeline,
) -> Optional[data_types.ExecutionInfo]:
    """This sets up a component launcher and executes the given step.

    Args:
        step: The step to be executed
        run_name: The unique run name
        pb2_pipeline: Protobuf Pipeline instance
    """
    # Substitute the runtime parameter to be a concrete run_id, it is
    # important for this to be unique for each run.
    runtime_parameter_utils.substitute_runtime_parameter(
        pb2_pipeline,
        {PIPELINE_RUN_ID_PARAMETER_NAME: run_name},
    )

    # Extract the deployment_configs and use it to access the executor and
    # custom driver spec
    deployment_config = runner_utils.extract_local_deployment_config(
        pb2_pipeline
    )
    executor_spec = runner_utils.extract_executor_spec(
        deployment_config, step.name
    )
    custom_driver_spec = runner_utils.extract_custom_driver_spec(
        deployment_config, step.name
    )

    # At this point the active metadata store is queried for the
    # metadata_connection
    repo = Repository()
    metadata_store = repo.active_stack.metadata_store
    metadata_connection = metadata.Metadata(
        metadata_store.get_tfx_metadata_config()
    )
    custom_executor_operators = {
        executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
    }

    # The protobuf node for the current step is loaded here.
    pipeline_node = self._get_node_with_step_name(
        step_name=step.name, pb2_pipeline=pb2_pipeline
    )

    # Create the tfx launcher responsible for executing the step.
    component_launcher = launcher.Launcher(
        pipeline_node=pipeline_node,
        mlmd_connection=metadata_connection,
        pipeline_info=pb2_pipeline.pipeline_info,
        pipeline_runtime_spec=pb2_pipeline.runtime_spec,
        executor_spec=executor_spec,
        custom_driver_spec=custom_driver_spec,
        custom_executor_operators=custom_executor_operators,
    )

    # In some stack configurations, some stack components (like experiment
    # trackers) will run some code before and after the actual step run.
    # This is where the step actually gets executed using the
    # component_launcher
    repo.active_stack.prepare_step_run()
    execution_info = self._execute_step(component_launcher)
    repo.active_stack.cleanup_step_run()

    return execution_info

context_utils

add_context_to_node(pipeline_node, type_, name, properties)

Add a new context to a TFX protobuf pipeline node.

Parameters:

Name Type Description Default
pipeline_node pipeline_pb2.PipelineNode

A tfx protobuf pipeline node

required
type_ str

The type name for the context to be added

required
name str

Unique key for the context

required
properties Dict[str, str]

dictionary of strings as properties of the context

required
Source code in zenml/orchestrators/context_utils.py
def add_context_to_node(
    pipeline_node: "pipeline_pb2.PipelineNode",
    type_: str,
    name: str,
    properties: Dict[str, str],
) -> None:
    """
    Add a new context to a TFX protobuf pipeline node.

    Args:
        pipeline_node: A tfx protobuf pipeline node
        type_: The type name for the context to be added
        name: Unique key for the context
        properties: dictionary of strings as properties of the context
    """
    # Add a new context to the pipeline
    context: "pipeline_pb2.ContextSpec" = pipeline_node.contexts.contexts.add()
    # Adding the type of context
    context.type.name = type_
    # Setting the name of the context
    context.name.field_value.string_value = name
    # Setting the properties of the context depending on attribute type
    for key, value in properties.items():
        c_property = context.properties[key]
        c_property.field_value.string_value = value

add_runtime_configuration_to_node(pipeline_node, runtime_config)

Add the runtime configuration of a pipeline run to a protobuf pipeline node.

Parameters:

Name Type Description Default
pipeline_node pipeline_pb2.PipelineNode

a tfx protobuf pipeline node

required
runtime_config RuntimeConfiguration

a ZenML RuntimeConfiguration

required
Source code in zenml/orchestrators/context_utils.py
def add_runtime_configuration_to_node(
    pipeline_node: "pipeline_pb2.PipelineNode",
    runtime_config: RuntimeConfiguration,
) -> None:
    """
    Add the runtime configuration of a pipeline run to a protobuf pipeline node.

    Args:
        pipeline_node: a tfx protobuf pipeline node
        runtime_config: a ZenML RuntimeConfiguration
    """
    skip_errors: bool = runtime_config.get(
        "ignore_unserializable_fields", False
    )

    # Determine the name of the context
    def _name(obj: "BaseModel") -> str:
        """Compute a unique context name for a pydantic BaseModel."""
        try:
            return str(hash(obj.json(sort_keys=True)))
        except TypeError as e:
            class_name = obj.__class__.__name__
            logging.info(
                "Cannot convert %s to json, generating uuid instead. Error: %s",
                class_name,
                e,
            )
            return f"{class_name}_{uuid.uuid1()}"

    # iterate over all attributes of runtime context, serializing all pydantic
    # objects to node context.
    for key, obj in runtime_config.items():
        if isinstance(obj, BaseModel):
            logger.debug("Adding %s to context", key)
            add_context_to_node(
                pipeline_node,
                type_=obj.__repr_name__().lower(),
                name=_name(obj),
                properties=serialize_pydantic_object(
                    obj, skip_errors=skip_errors
                ),
            )

serialize_pydantic_object(obj, *, skip_errors=False)

Convert a pydantic object to a dict of strings

Source code in zenml/orchestrators/context_utils.py
def serialize_pydantic_object(
    obj: BaseModel, *, skip_errors: bool = False
) -> Dict[str, str]:
    """Convert a pydantic object to a dict of strings"""

    class PydanticEncoder(json.JSONEncoder):
        def default(self, o: Any) -> Any:
            try:
                return cast(Callable[[Any], str], obj.__json_encoder__)(o)
            except TypeError:
                return super().default(o)

    def _inner_generator(
        dictionary: Dict[str, Any]
    ) -> Iterator[Tuple[str, str]]:
        """Itemwise serialize each element in a dictionary."""
        for key, item in dictionary.items():
            try:
                yield key, json.dumps(item, cls=PydanticEncoder)
            except TypeError as e:
                if skip_errors:
                    logging.info(
                        "Skipping adding field '%s' to metadata context as "
                        "it cannot be serialized due to %s.",
                        key,
                        e,
                    )
                else:
                    raise TypeError(
                        f"Invalid type {type(item)} for key {key} can not be "
                        "serialized."
                    ) from e

    return {key: value for key, value in _inner_generator(obj.dict())}

local special

local_orchestrator

LocalOrchestrator (BaseOrchestrator) pydantic-model

Orchestrator responsible for running pipelines locally. This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally. This orchestrator
    does not allow for concurrent execution of steps and also does not support
    running on a schedule."""

    FLAVOR: ClassVar[str] = "local"

    def prepare_or_run_pipeline(
        self,
        sorted_steps: List[BaseStep],
        pipeline: "BasePipeline",
        pb2_pipeline: Pb2Pipeline,
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> Any:
        """This method iterates through all steps and executes them sequentially."""
        if runtime_configuration.schedule:
            logger.warning(
                "Local Orchestrator currently does not support the"
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )
        assert runtime_configuration.run_name, "Run name must be set"

        # Run each step
        for step in sorted_steps:
            self.run_step(
                step=step,
                run_name=runtime_configuration.run_name,
                pb2_pipeline=pb2_pipeline,
            )
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)

This method iterates through all steps and executes them sequentially.

Source code in zenml/orchestrators/local/local_orchestrator.py
def prepare_or_run_pipeline(
    self,
    sorted_steps: List[BaseStep],
    pipeline: "BasePipeline",
    pb2_pipeline: Pb2Pipeline,
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> Any:
    """This method iterates through all steps and executes them sequentially."""
    if runtime_configuration.schedule:
        logger.warning(
            "Local Orchestrator currently does not support the"
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )
    assert runtime_configuration.run_name, "Run name must be set"

    # Run each step
    for step in sorted_steps:
        self.run_step(
            step=step,
            run_name=runtime_configuration.run_name,
            pb2_pipeline=pb2_pipeline,
        )

utils

create_tfx_pipeline(zenml_pipeline, stack)

Creates a tfx pipeline from a ZenML pipeline.

Source code in zenml/orchestrators/utils.py
def create_tfx_pipeline(
    zenml_pipeline: "BasePipeline", stack: "Stack"
) -> tfx_pipeline.Pipeline:
    """Creates a tfx pipeline from a ZenML pipeline."""
    # Connect the inputs/outputs of all steps in the pipeline
    zenml_pipeline.connect(**zenml_pipeline.steps)

    tfx_components = [step.component for step in zenml_pipeline.steps.values()]

    artifact_store = stack.artifact_store
    metadata_store = stack.metadata_store

    return tfx_pipeline.Pipeline(
        pipeline_name=zenml_pipeline.name,
        components=tfx_components,  # type: ignore[arg-type]
        pipeline_root=artifact_store.path,
        metadata_connection_config=metadata_store.get_tfx_metadata_config(),
        enable_cache=zenml_pipeline.enable_cache,
    )

get_cache_status(execution_info)

Returns the caching status of a step.

Parameters:

Name Type Description Default
execution_info ExecutionInfo

The execution info of a tfx step.

required

Exceptions:

Type Description
AttributeError

If the execution info is None.

KeyError

If no pipeline info is found in the execution_info.

Returns:

Type Description
bool

The caching status of a tfx step as a boolean value.

Source code in zenml/orchestrators/utils.py
def get_cache_status(
    execution_info: data_types.ExecutionInfo,
) -> bool:
    """Returns the caching status of a step.

    Args:
        execution_info: The execution info of a `tfx` step.

    Raises:
        AttributeError: If the execution info is `None`.
        KeyError: If no pipeline info is found in the `execution_info`.

    Returns:
        The caching status of a `tfx` step as a boolean value.
    """
    if execution_info is None:
        logger.warning("No execution info found when checking cache status.")
        return False

    status = False
    repository = Repository()
    # TODO [ENG-706]: Get the current running stack instead of just the active
    #   stack
    active_stack = repository.active_stack
    if not active_stack:
        raise RuntimeError(
            "No active stack is configured for the repository. Run "
            "`zenml stack set STACK_NAME` to update the active stack."
        )

    metadata_store = active_stack.metadata_store

    step_name_param = (
        INTERNAL_EXECUTION_PARAMETER_PREFIX + PARAM_PIPELINE_PARAMETER_NAME
    )
    step_name = json.loads(execution_info.exec_properties[step_name_param])
    if execution_info.pipeline_info:
        pipeline_name = execution_info.pipeline_info.id
    else:
        raise KeyError(f"No pipeline info found for step `{step_name}`.")
    pipeline_run_name = cast(str, execution_info.pipeline_run_id)
    pipeline = metadata_store.get_pipeline(pipeline_name)
    if pipeline is None:
        logger.error(f"Pipeline {pipeline_name} not found in Metadata Store.")
    else:
        status = (
            pipeline.get_run(pipeline_run_name).get_step(step_name).is_cached
        )
    return status

get_step_for_node(node, steps)

Finds the matching step for a tfx pipeline node.

Source code in zenml/orchestrators/utils.py
def get_step_for_node(node: PipelineNode, steps: List[BaseStep]) -> BaseStep:
    """Finds the matching step for a tfx pipeline node."""
    step_name = node.node_info.id
    try:
        return next(step for step in steps if step.name == step_name)
    except StopIteration:
        raise RuntimeError(f"Unable to find step with name '{step_name}'.")