Skip to content

Orchestrators

zenml.orchestrators special

Initialization for ZenML orchestrators.

An orchestrator is a special kind of backend that manages the running of each step of the pipeline. Orchestrators administer the actual pipeline runs. You can think of it as the 'root' of any pipeline job that you run during your experimentation.

ZenML supports a local orchestrator out of the box which allows you to run your pipelines in a local environment. We also support using Apache Airflow as the orchestrator to handle the steps of your pipeline.

base_orchestrator

Base orchestrator class.

BaseOrchestrator (StackComponent, ABC)

Base class for all orchestrators.

In order to implement an orchestrator you will need to subclass from this class.

How it works:

The run(...) method is the entrypoint that is executed when the pipeline's run method is called within the user code (pipeline_instance.run(...)).

This method will do some internal preparation and then call the prepare_or_run_pipeline(...) method. BaseOrchestrator subclasses must implement this method and either run the pipeline steps directly or deploy the pipeline to some remote infrastructure.

Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestrator(StackComponent, ABC):
    """Base class for all orchestrators.

    In order to implement an orchestrator you will need to subclass from this
    class.

    How it works:
    -------------
    The `run(...)` method is the entrypoint that is executed when the
    pipeline's run method is called within the user code
    (`pipeline_instance.run(...)`).

    This method will do some internal preparation and then call the
    `prepare_or_run_pipeline(...)` method. BaseOrchestrator subclasses must
    implement this method and either run the pipeline steps directly or deploy
    the pipeline to some remote infrastructure.
    """

    # Class Configuration
    TYPE: ClassVar[StackComponentType] = StackComponentType.ORCHESTRATOR
    _active_deployment: Optional["PipelineDeployment"] = None
    _active_pb2_pipeline: Optional[Pb2Pipeline] = None

    @property
    def config(self) -> BaseOrchestratorConfig:
        """Returns the `BaseOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(BaseOrchestratorConfig, self._config)

    @abstractmethod
    def get_orchestrator_run_id(self) -> str:
        """Returns the run id of the active orchestrator run.

        Important: This needs to be a unique ID and return the same value for
        all steps of a pipeline run.

        Returns:
            The orchestrator run id.
        """

    @abstractmethod
    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """This method needs to be implemented by the respective orchestrator.

        Depending on the type of orchestrator you'll have to perform slightly
        different operations.

        Simple Case:
        ------------
        The Steps are run directly from within the same environment in which
        the orchestrator code is executed. In this case you will need to
        deal with implementation-specific runtime configurations (like the
        schedule) and then iterate through the steps and finally call
        `self.run_step(...)` to execute each step.

        Advanced Case:
        --------------
        Most orchestrators will not run the steps directly. Instead, they
        build some intermediate representation of the pipeline that is then
        used to create and run the pipeline and its steps on the target
        environment. For such orchestrators this method will have to build
        this representation and deploy it.

        Regardless of the implementation details, the orchestrator will need
        to run each step in the target environment. For this the
        `self.run_step(...)` method should be used.

        The easiest way to make this work is by using an entrypoint
        configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
        or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.

        Returns:
            The optional return value from this method will be returned by the
            `pipeline_instance.run()` call when someone is running a pipeline.
        """

    def run(self, deployment: "PipelineDeployment", stack: "Stack") -> Any:
        """Runs a pipeline on a stack.

        Args:
            deployment: The pipeline deployment.
            stack: The stack on which to run the pipeline.

        Returns:
            Orchestrator-specific return value.
        """
        self._prepare_run(deployment=deployment)

        result = self.prepare_or_run_pipeline(
            deployment=deployment, stack=stack
        )

        self._cleanup_run()

        return result

    def run_step(self, step: "Step") -> Optional[data_types.ExecutionInfo]:
        """This sets up a component launcher and executes the given step.

        Args:
            step: The step to be executed

        Returns:
            The execution info of the step.
        """
        assert self._active_deployment
        assert self._active_pb2_pipeline

        self._ensure_artifact_classes_loaded(step.config)

        step_name = step.config.name
        pb2_pipeline = self._active_pb2_pipeline

        run_model = self._create_or_reuse_run()

        # Substitute the runtime parameter to be a concrete run_id, it is
        # important for this to be unique for each run.
        runtime_parameter_utils.substitute_runtime_parameter(
            pb2_pipeline,
            {PIPELINE_RUN_ID_PARAMETER_NAME: run_model.name},
        )

        # Extract the deployment_configs and use it to access the executor and
        # custom driver spec
        deployment_config = runner_utils.extract_local_deployment_config(
            pb2_pipeline
        )
        executor_spec = runner_utils.extract_executor_spec(
            deployment_config, step_name
        )
        custom_driver_spec = runner_utils.extract_custom_driver_spec(
            deployment_config, step_name
        )

        metadata_connection_cfg = Client().zen_store.get_metadata_config()

        executor_operator = self._get_executor_operator(
            step_operator=step.config.step_operator
        )
        custom_executor_operators = {
            executable_spec_pb2.PythonClassExecutableSpec: executor_operator
        }

        step_run_info = StepRunInfo(
            config=step.config,
            pipeline=self._active_deployment.pipeline,
            run_name=run_model.name,
        )

        # The protobuf node for the current step is loaded here.
        pipeline_node = self._get_node_with_step_name(step_name)

        stack = Client().active_stack
        proto_utils.add_mlmd_contexts(
            pipeline_node=pipeline_node,
            step=step,
            deployment=self._active_deployment,
            stack=stack,
        )

        component_launcher = launcher.Launcher(
            pipeline_node=pipeline_node,
            mlmd_connection=metadata.Metadata(metadata_connection_cfg),
            pipeline_info=pb2_pipeline.pipeline_info,
            pipeline_runtime_spec=pb2_pipeline.runtime_spec,
            executor_spec=executor_spec,
            custom_driver_spec=custom_driver_spec,
            custom_executor_operators=custom_executor_operators,
        )

        # If a step operator is used, the current environment will not be the
        # one executing the step function code and therefore we don't need to
        # run any preparation
        if step.config.step_operator:
            execution_info = self._execute_step(component_launcher)
        else:
            stack.prepare_step_run(info=step_run_info)
            step_failed = False
            try:
                execution_info = self._execute_step(component_launcher)
            except:  # noqa: E722
                self._publish_failed_run(run_name_or_id=run_model.id)
                step_failed = True
                raise
            finally:
                stack.cleanup_step_run(
                    info=step_run_info, step_failed=step_failed
                )

        return execution_info

    @staticmethod
    def requires_resources_in_orchestration_environment(
        step: "Step",
    ) -> bool:
        """Checks if the orchestrator should run this step on special resources.

        Args:
            step: The step that will be checked.

        Returns:
            True if the step requires special resources in the orchestration
            environment, False otherwise.
        """
        # If the step requires custom resources and doesn't run with a step
        # operator, it would need these requirements in the orchestrator
        # environment
        if step.config.step_operator:
            return False

        return not step.config.resource_settings.empty

    def _prepare_run(self, deployment: "PipelineDeployment") -> None:
        """Prepares a run.

        Args:
            deployment: The deployment to prepare.
        """
        self._active_deployment = deployment

        pb2_pipeline = Pb2Pipeline()
        pb2_pipeline_json = string_utils.b64_decode(
            self._active_deployment.proto_pipeline
        )
        json_format.Parse(pb2_pipeline_json, pb2_pipeline)
        self._active_pb2_pipeline = pb2_pipeline

    def _cleanup_run(self) -> None:
        """Cleans up the active run."""
        self._active_deployment = None
        self._active_pb2_pipeline = None

    def get_run_id_for_orchestrator_run_id(
        self, orchestrator_run_id: str
    ) -> UUID:
        """Generates a run ID from an orchestrator run id.

        Args:
            orchestrator_run_id: The orchestrator run id.

        Returns:
            The run id generated from the orchestrator run id.
        """
        run_id_seed = f"{self.id}-{orchestrator_run_id}"
        return uuid_utils.generate_uuid_from_string(run_id_seed)

    def _create_or_reuse_run(self) -> PipelineRunResponseModel:
        """Creates a run or reuses an existing one.

        Returns:
            The created or existing run.
        """
        assert self._active_deployment
        orchestrator_run_id = self.get_orchestrator_run_id()

        run_id = self.get_run_id_for_orchestrator_run_id(orchestrator_run_id)

        date = datetime.now().strftime("%Y_%m_%d")
        time = datetime.now().strftime("%H_%M_%S_%f")
        run_name = self._active_deployment.run_name.format(date=date, time=time)

        logger.debug("Creating run with ID: %s, name: %s", run_id, run_name)

        client = Client()
        run_model = PipelineRunRequestModel(
            id=run_id,
            name=run_name,
            orchestrator_run_id=orchestrator_run_id,
            user=client.active_user.id,
            project=client.active_project.id,
            stack=self._active_deployment.stack_id,
            pipeline=self._active_deployment.pipeline_id,
            status=ExecutionStatus.RUNNING,
            pipeline_configuration=self._active_deployment.pipeline.dict(),
            num_steps=len(self._active_deployment.steps),
        )

        return client.zen_store.get_or_create_run(run_model)

    @staticmethod
    def _publish_failed_run(run_name_or_id: Union[str, UUID]) -> None:
        """Set run status to failed.

        Args:
            run_name_or_id: The name or ID of the run that failed.
        """
        client = Client()
        run = client.zen_store.get_run(run_name_or_id)
        run.status = ExecutionStatus.FAILED
        client.zen_store.update_run(
            run_id=run.id,
            run_update=PipelineRunUpdateModel(status=ExecutionStatus.FAILED),
        )

    @staticmethod
    def _ensure_artifact_classes_loaded(
        step_configuration: "StepConfiguration",
    ) -> None:
        """Ensures that all artifact classes for a step are loaded.

        Args:
            step_configuration: A step configuration.
        """
        artifact_class_sources = set(
            input_.artifact_source
            for input_ in step_configuration.inputs.values()
        ) | set(
            output.artifact_source
            for output in step_configuration.outputs.values()
        )

        for source in artifact_class_sources:
            # Tfx depends on these classes being loaded so it can detect the
            # correct artifact class
            source_utils.validate_source_class(
                source, expected_class=BaseArtifact
            )

    @staticmethod
    def _execute_step(
        tfx_launcher: launcher.Launcher,
    ) -> Optional[data_types.ExecutionInfo]:
        """Executes a tfx component.

        Args:
            tfx_launcher: A tfx launcher to execute the component.

        Returns:
            Optional execution info returned by the launcher.

        Raises:
            RuntimeError: If the execution failed during preparation.
        """
        pipeline_step_name = tfx_launcher._pipeline_node.node_info.id
        start_time = time.time()
        logger.info(f"Step `{pipeline_step_name}` has started.")

        # There is no way to differentiate between a cached and a failed
        # execution based on the execution info returned by the TFX launcher.
        # We patch the _publish_failed_execution method in order to check
        # if an execution failed.
        execution_failed = False
        original_publish_failed_execution = (
            tfx_launcher._publish_failed_execution
        )

        def _new_publish_failed_execution(
            self: launcher.Launcher, *args: Any, **kwargs: Any
        ) -> None:
            original_publish_failed_execution(*args, **kwargs)
            nonlocal execution_failed
            execution_failed = True

        setattr(
            tfx_launcher,
            "_publish_failed_execution",
            types.MethodType(_new_publish_failed_execution, tfx_launcher),
        )
        execution_info = tfx_launcher.launch()
        if execution_failed:
            raise RuntimeError(
                "Failed to execute step. This is probably because some input "
                f"artifacts for the step {pipeline_step_name} could not be "
                "found in the database."
            )

        if execution_info and get_cache_status(execution_info):
            logger.info(f"Using cached version of `{pipeline_step_name}`.")

        run_duration = time.time() - start_time
        logger.info(
            f"Step `{pipeline_step_name}` has finished in "
            f"{string_utils.get_human_readable_time(run_duration)}."
        )
        return execution_info

    @staticmethod
    def _get_executor_operator(
        step_operator: Optional[str],
    ) -> Type[BaseExecutorOperator]:
        """Gets the TFX executor operator for the given step operator.

        Args:
            step_operator: The optional step operator used to run a step.

        Returns:
            The executor operator for the given step operator.
        """
        if step_operator:
            from zenml.step_operators.step_executor_operator import (
                StepExecutorOperator,
            )

            return StepExecutorOperator
        else:
            return PythonExecutorOperator

    def _get_node_with_step_name(self, step_name: str) -> PipelineNode:
        """Given the name of a step, return the node with that name from the pb2_pipeline.

        Args:
            step_name: Name of the step

        Returns:
            PipelineNode instance

        Raises:
            KeyError: If the step name is not found in the pipeline.
        """
        assert self._active_pb2_pipeline

        for node in self._active_pb2_pipeline.nodes:
            if (
                node.WhichOneof("node") == "pipeline_node"
                and node.pipeline_node.node_info.id == step_name
            ):
                return node.pipeline_node

        raise KeyError(
            f"Step {step_name} not found in Pipeline "
            f"{self._active_pb2_pipeline.pipeline_info.id}"
        )
config: BaseOrchestratorConfig property readonly

Returns the BaseOrchestratorConfig config.

Returns:

Type Description
BaseOrchestratorConfig

The configuration.

get_orchestrator_run_id(self)

Returns the run id of the active orchestrator run.

Important: This needs to be a unique ID and return the same value for all steps of a pipeline run.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def get_orchestrator_run_id(self) -> str:
    """Returns the run id of the active orchestrator run.

    Important: This needs to be a unique ID and return the same value for
    all steps of a pipeline run.

    Returns:
        The orchestrator run id.
    """
get_run_id_for_orchestrator_run_id(self, orchestrator_run_id)

Generates a run ID from an orchestrator run id.

Parameters:

Name Type Description Default
orchestrator_run_id str

The orchestrator run id.

required

Returns:

Type Description
UUID

The run id generated from the orchestrator run id.

Source code in zenml/orchestrators/base_orchestrator.py
def get_run_id_for_orchestrator_run_id(
    self, orchestrator_run_id: str
) -> UUID:
    """Generates a run ID from an orchestrator run id.

    Args:
        orchestrator_run_id: The orchestrator run id.

    Returns:
        The run id generated from the orchestrator run id.
    """
    run_id_seed = f"{self.id}-{orchestrator_run_id}"
    return uuid_utils.generate_uuid_from_string(run_id_seed)
prepare_or_run_pipeline(self, deployment, stack)

This method needs to be implemented by the respective orchestrator.

Depending on the type of orchestrator you'll have to perform slightly different operations.

Simple Case:

The Steps are run directly from within the same environment in which the orchestrator code is executed. In this case you will need to deal with implementation-specific runtime configurations (like the schedule) and then iterate through the steps and finally call self.run_step(...) to execute each step.

Advanced Case:

Most orchestrators will not run the steps directly. Instead, they build some intermediate representation of the pipeline that is then used to create and run the pipeline and its steps on the target environment. For such orchestrators this method will have to build this representation and deploy it.

Regardless of the implementation details, the orchestrator will need to run each step in the target environment. For this the self.run_step(...) method should be used.

The easiest way to make this work is by using an entrypoint configuration to run single steps (zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration) or entire pipelines (zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration).

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required

Returns:

Type Description
Any

The optional return value from this method will be returned by the pipeline_instance.run() call when someone is running a pipeline.

Source code in zenml/orchestrators/base_orchestrator.py
@abstractmethod
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """This method needs to be implemented by the respective orchestrator.

    Depending on the type of orchestrator you'll have to perform slightly
    different operations.

    Simple Case:
    ------------
    The Steps are run directly from within the same environment in which
    the orchestrator code is executed. In this case you will need to
    deal with implementation-specific runtime configurations (like the
    schedule) and then iterate through the steps and finally call
    `self.run_step(...)` to execute each step.

    Advanced Case:
    --------------
    Most orchestrators will not run the steps directly. Instead, they
    build some intermediate representation of the pipeline that is then
    used to create and run the pipeline and its steps on the target
    environment. For such orchestrators this method will have to build
    this representation and deploy it.

    Regardless of the implementation details, the orchestrator will need
    to run each step in the target environment. For this the
    `self.run_step(...)` method should be used.

    The easiest way to make this work is by using an entrypoint
    configuration to run single steps (`zenml.entrypoints.step_entrypoint_configuration.StepEntrypointConfiguration`)
    or entire pipelines (`zenml.entrypoints.pipeline_entrypoint_configuration.PipelineEntrypointConfiguration`).

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.

    Returns:
        The optional return value from this method will be returned by the
        `pipeline_instance.run()` call when someone is running a pipeline.
    """
requires_resources_in_orchestration_environment(step) staticmethod

Checks if the orchestrator should run this step on special resources.

Parameters:

Name Type Description Default
step Step

The step that will be checked.

required

Returns:

Type Description
bool

True if the step requires special resources in the orchestration environment, False otherwise.

Source code in zenml/orchestrators/base_orchestrator.py
@staticmethod
def requires_resources_in_orchestration_environment(
    step: "Step",
) -> bool:
    """Checks if the orchestrator should run this step on special resources.

    Args:
        step: The step that will be checked.

    Returns:
        True if the step requires special resources in the orchestration
        environment, False otherwise.
    """
    # If the step requires custom resources and doesn't run with a step
    # operator, it would need these requirements in the orchestrator
    # environment
    if step.config.step_operator:
        return False

    return not step.config.resource_settings.empty
run(self, deployment, stack)

Runs a pipeline on a stack.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment.

required
stack Stack

The stack on which to run the pipeline.

required

Returns:

Type Description
Any

Orchestrator-specific return value.

Source code in zenml/orchestrators/base_orchestrator.py
def run(self, deployment: "PipelineDeployment", stack: "Stack") -> Any:
    """Runs a pipeline on a stack.

    Args:
        deployment: The pipeline deployment.
        stack: The stack on which to run the pipeline.

    Returns:
        Orchestrator-specific return value.
    """
    self._prepare_run(deployment=deployment)

    result = self.prepare_or_run_pipeline(
        deployment=deployment, stack=stack
    )

    self._cleanup_run()

    return result
run_step(self, step)

This sets up a component launcher and executes the given step.

Parameters:

Name Type Description Default
step Step

The step to be executed

required

Returns:

Type Description
Optional[tfx.orchestration.portable.data_types.ExecutionInfo]

The execution info of the step.

Source code in zenml/orchestrators/base_orchestrator.py
def run_step(self, step: "Step") -> Optional[data_types.ExecutionInfo]:
    """This sets up a component launcher and executes the given step.

    Args:
        step: The step to be executed

    Returns:
        The execution info of the step.
    """
    assert self._active_deployment
    assert self._active_pb2_pipeline

    self._ensure_artifact_classes_loaded(step.config)

    step_name = step.config.name
    pb2_pipeline = self._active_pb2_pipeline

    run_model = self._create_or_reuse_run()

    # Substitute the runtime parameter to be a concrete run_id, it is
    # important for this to be unique for each run.
    runtime_parameter_utils.substitute_runtime_parameter(
        pb2_pipeline,
        {PIPELINE_RUN_ID_PARAMETER_NAME: run_model.name},
    )

    # Extract the deployment_configs and use it to access the executor and
    # custom driver spec
    deployment_config = runner_utils.extract_local_deployment_config(
        pb2_pipeline
    )
    executor_spec = runner_utils.extract_executor_spec(
        deployment_config, step_name
    )
    custom_driver_spec = runner_utils.extract_custom_driver_spec(
        deployment_config, step_name
    )

    metadata_connection_cfg = Client().zen_store.get_metadata_config()

    executor_operator = self._get_executor_operator(
        step_operator=step.config.step_operator
    )
    custom_executor_operators = {
        executable_spec_pb2.PythonClassExecutableSpec: executor_operator
    }

    step_run_info = StepRunInfo(
        config=step.config,
        pipeline=self._active_deployment.pipeline,
        run_name=run_model.name,
    )

    # The protobuf node for the current step is loaded here.
    pipeline_node = self._get_node_with_step_name(step_name)

    stack = Client().active_stack
    proto_utils.add_mlmd_contexts(
        pipeline_node=pipeline_node,
        step=step,
        deployment=self._active_deployment,
        stack=stack,
    )

    component_launcher = launcher.Launcher(
        pipeline_node=pipeline_node,
        mlmd_connection=metadata.Metadata(metadata_connection_cfg),
        pipeline_info=pb2_pipeline.pipeline_info,
        pipeline_runtime_spec=pb2_pipeline.runtime_spec,
        executor_spec=executor_spec,
        custom_driver_spec=custom_driver_spec,
        custom_executor_operators=custom_executor_operators,
    )

    # If a step operator is used, the current environment will not be the
    # one executing the step function code and therefore we don't need to
    # run any preparation
    if step.config.step_operator:
        execution_info = self._execute_step(component_launcher)
    else:
        stack.prepare_step_run(info=step_run_info)
        step_failed = False
        try:
            execution_info = self._execute_step(component_launcher)
        except:  # noqa: E722
            self._publish_failed_run(run_name_or_id=run_model.id)
            step_failed = True
            raise
        finally:
            stack.cleanup_step_run(
                info=step_run_info, step_failed=step_failed
            )

    return execution_info

BaseOrchestratorConfig (StackComponentConfig) pydantic-model

Base orchestrator config.

Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestratorConfig(StackComponentConfig):
    """Base orchestrator config."""

    @root_validator(pre=True)
    def _deprecations(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Validate and/or remove deprecated fields.

        Args:
            values: The values to validate.

        Returns:
            The validated values.
        """
        if "custom_docker_base_image_name" in values:
            image_name = values.pop("custom_docker_base_image_name", None)
            if image_name:
                logger.warning(
                    "The 'custom_docker_base_image_name' field has been "
                    "deprecated. To use a custom base container image with your "
                    "orchestrators, please use the DockerSettings in your "
                    "pipeline (see https://docs.zenml.io/advanced-guide/pipelines/containerization)."
                )

        return values

BaseOrchestratorFlavor (Flavor)

Base orchestrator flavor class.

Source code in zenml/orchestrators/base_orchestrator.py
class BaseOrchestratorFlavor(Flavor):
    """Base orchestrator flavor class."""

    @property
    def type(self) -> StackComponentType:
        """Returns the flavor type.

        Returns:
            The flavor type.
        """
        return StackComponentType.ORCHESTRATOR

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return BaseOrchestratorConfig

    @property
    @abstractmethod
    def implementation_class(self) -> Type["BaseOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

implementation_class: Type[BaseOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[BaseOrchestrator]

The implementation class.

type: StackComponentType property readonly

Returns the flavor type.

Returns:

Type Description
StackComponentType

The flavor type.

local special

Initialization for the local orchestrator.

local_orchestrator

Implementation of the ZenML local orchestrator.

LocalOrchestrator (BaseOrchestrator)

Orchestrator responsible for running pipelines locally.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally.

    This orchestrator does not allow for concurrent execution of steps and also
    does not support running on a schedule.
    """

    _orchestrator_run_id: Optional[str] = None

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Iterates through all steps and executes them sequentially.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack on which the pipeline is deployed.
        """
        if deployment.schedule:
            logger.warning(
                "Local Orchestrator currently does not support the "
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        self._orchestrator_run_id = str(uuid4())
        start_time = time.time()

        # Run each step
        for step in deployment.steps.values():
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not supported for the local "
                    "orchestrator, ignoring resource configuration for "
                    "step %s.",
                    step.config.name,
                )

            self.run_step(
                step=step,
            )

        run_duration = time.time() - start_time
        run_id = self.get_run_id_for_orchestrator_run_id(
            self._orchestrator_run_id
        )
        run_model = Client().zen_store.get_run(run_id)
        logger.info(
            "Pipeline run `%s` has finished in %s.",
            run_model.name,
            string_utils.get_human_readable_time(run_duration),
        )
        self._orchestrator_run_id = None

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If no run id exists. This happens when this method
                gets called while the orchestrator is not running a pipeline.

        Returns:
            The orchestrator run id.
        """
        if not self._orchestrator_run_id:
            raise RuntimeError("No run id set.")

        return self._orchestrator_run_id
get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If no run id exists. This happens when this method gets called while the orchestrator is not running a pipeline.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/orchestrators/local/local_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If no run id exists. This happens when this method
            gets called while the orchestrator is not running a pipeline.

    Returns:
        The orchestrator run id.
    """
    if not self._orchestrator_run_id:
        raise RuntimeError("No run id set.")

    return self._orchestrator_run_id
prepare_or_run_pipeline(self, deployment, stack)

Iterates through all steps and executes them sequentially.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack on which the pipeline is deployed.

required
Source code in zenml/orchestrators/local/local_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Iterates through all steps and executes them sequentially.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack on which the pipeline is deployed.
    """
    if deployment.schedule:
        logger.warning(
            "Local Orchestrator currently does not support the "
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    self._orchestrator_run_id = str(uuid4())
    start_time = time.time()

    # Run each step
    for step in deployment.steps.values():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "orchestrator, ignoring resource configuration for "
                "step %s.",
                step.config.name,
            )

        self.run_step(
            step=step,
        )

    run_duration = time.time() - start_time
    run_id = self.get_run_id_for_orchestrator_run_id(
        self._orchestrator_run_id
    )
    run_model = Client().zen_store.get_run(run_id)
    logger.info(
        "Pipeline run `%s` has finished in %s.",
        run_model.name,
        string_utils.get_human_readable_time(run_duration),
    )
    self._orchestrator_run_id = None
LocalOrchestratorConfig (BaseOrchestratorConfig) pydantic-model

Local orchestrator config.

Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestratorConfig(BaseOrchestratorConfig):
    """Local orchestrator config."""

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return True
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

LocalOrchestratorFlavor (BaseOrchestratorFlavor)

Class for the LocalOrchestratorFlavor.

Source code in zenml/orchestrators/local/local_orchestrator.py
class LocalOrchestratorFlavor(BaseOrchestratorFlavor):
    """Class for the `LocalOrchestratorFlavor`."""

    @property
    def name(self) -> str:
        """The flavor name.

        Returns:
            The flavor name.
        """
        return "local"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return LocalOrchestratorConfig

    @property
    def implementation_class(self) -> Type[LocalOrchestrator]:
        """Implementation class for this flavor.

        Returns:
            The implementation class for this flavor.
        """
        return LocalOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

implementation_class: Type[zenml.orchestrators.local.local_orchestrator.LocalOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[zenml.orchestrators.local.local_orchestrator.LocalOrchestrator]

The implementation class for this flavor.

name: str property readonly

The flavor name.

Returns:

Type Description
str

The flavor name.

local_docker special

Initialization for the local Docker orchestrator.

local_docker_orchestrator

Implementation of the ZenML local Docker orchestrator.

LocalDockerOrchestrator (BaseOrchestrator)

Orchestrator responsible for running pipelines locally using Docker.

This orchestrator does not allow for concurrent execution of steps and also does not support running on a schedule.

Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines locally using Docker.

    This orchestrator does not allow for concurrent execution of steps and also
    does not support running on a schedule.
    """

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Local Docker orchestrator.

        Returns:
            The settings class.
        """
        return LocalDockerOrchestratorSettings

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Build a Docker image and (maybe) push it to the container registry.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """
        docker_image_builder = PipelineDockerImageBuilder()
        if stack.container_registry:
            repo_digest = docker_image_builder.build_and_push_docker_image(
                deployment=deployment, stack=stack
            )
            deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
        else:
            # If there is no container registry, we only build the image
            target_image_name = docker_image_builder.get_target_image_name(
                deployment=deployment
            )
            docker_image_builder.build_docker_image(
                target_image_name=target_image_name,
                deployment=deployment,
                stack=stack,
            )
            deployment.add_extra(
                ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
            )

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID}."
            )

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Sequentially runs all pipeline steps in local Docker containers.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
        """
        if deployment.schedule:
            logger.warning(
                "Local Docker Orchestrator currently does not support the"
                "use of schedules. The `schedule` will be ignored "
                "and the pipeline will be run immediately."
            )

        from docker.client import DockerClient

        docker_client = DockerClient.from_env()
        image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
        entrypoint = StepEntrypointConfiguration.get_entrypoint_command()

        # Add the local stores path as a volume mount
        stack.check_local_paths()
        local_stores_path = GlobalConfiguration().local_stores_path
        volumes = {
            local_stores_path: {
                "bind": local_stores_path,
                "mode": "rw",
            }
        }
        orchestrator_run_id = str(uuid4())
        environment = {
            ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID: orchestrator_run_id,
            ENV_ZENML_LOCAL_STORES_PATH: local_stores_path,
        }
        start_time = time.time()

        # Run each step
        for step_name, step in deployment.steps.items():
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not supported for the local "
                    "Docker orchestrator, ignoring resource configuration for "
                    "step %s.",
                    step.config.name,
                )

            arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name
            )

            settings = cast(
                LocalDockerOrchestratorSettings,
                self.get_settings(step),
            )

            user = None
            if sys.platform != "win32":
                user = os.getuid()
            logger.info("Running step `%s` in Docker:", step_name)
            logs = docker_client.containers.run(
                image=image_name,
                entrypoint=entrypoint,
                command=arguments,
                user=user,
                volumes=volumes,
                environment=environment,
                stream=True,
                extra_hosts={"host.docker.internal": "host-gateway"},
                **settings.run_args,
            )

            for line in logs:
                logger.info(line.strip().decode())

        run_duration = time.time() - start_time
        run_id = self.get_run_id_for_orchestrator_run_id(orchestrator_run_id)
        run_model = Client().zen_store.get_run(run_id)
        logger.info(
            "Pipeline run `%s` has finished in %s.",
            run_model.name,
            string_utils.get_human_readable_time(run_duration),
        )
settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Local Docker orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID}."
        )
prepare_or_run_pipeline(self, deployment, stack)

Sequentially runs all pipeline steps in local Docker containers.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Sequentially runs all pipeline steps in local Docker containers.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
    """
    if deployment.schedule:
        logger.warning(
            "Local Docker Orchestrator currently does not support the"
            "use of schedules. The `schedule` will be ignored "
            "and the pipeline will be run immediately."
        )

    from docker.client import DockerClient

    docker_client = DockerClient.from_env()
    image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
    entrypoint = StepEntrypointConfiguration.get_entrypoint_command()

    # Add the local stores path as a volume mount
    stack.check_local_paths()
    local_stores_path = GlobalConfiguration().local_stores_path
    volumes = {
        local_stores_path: {
            "bind": local_stores_path,
            "mode": "rw",
        }
    }
    orchestrator_run_id = str(uuid4())
    environment = {
        ENV_ZENML_DOCKER_ORCHESTRATOR_RUN_ID: orchestrator_run_id,
        ENV_ZENML_LOCAL_STORES_PATH: local_stores_path,
    }
    start_time = time.time()

    # Run each step
    for step_name, step in deployment.steps.items():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not supported for the local "
                "Docker orchestrator, ignoring resource configuration for "
                "step %s.",
                step.config.name,
            )

        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name
        )

        settings = cast(
            LocalDockerOrchestratorSettings,
            self.get_settings(step),
        )

        user = None
        if sys.platform != "win32":
            user = os.getuid()
        logger.info("Running step `%s` in Docker:", step_name)
        logs = docker_client.containers.run(
            image=image_name,
            entrypoint=entrypoint,
            command=arguments,
            user=user,
            volumes=volumes,
            environment=environment,
            stream=True,
            extra_hosts={"host.docker.internal": "host-gateway"},
            **settings.run_args,
        )

        for line in logs:
            logger.info(line.strip().decode())

    run_duration = time.time() - start_time
    run_id = self.get_run_id_for_orchestrator_run_id(orchestrator_run_id)
    run_model = Client().zen_store.get_run(run_id)
    logger.info(
        "Pipeline run `%s` has finished in %s.",
        run_model.name,
        string_utils.get_human_readable_time(run_duration),
    )
prepare_pipeline_deployment(self, deployment, stack)

Build a Docker image and (maybe) push it to the container registry.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Build a Docker image and (maybe) push it to the container registry.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    docker_image_builder = PipelineDockerImageBuilder()
    if stack.container_registry:
        repo_digest = docker_image_builder.build_and_push_docker_image(
            deployment=deployment, stack=stack
        )
        deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
    else:
        # If there is no container registry, we only build the image
        target_image_name = docker_image_builder.get_target_image_name(
            deployment=deployment
        )
        docker_image_builder.build_docker_image(
            target_image_name=target_image_name,
            deployment=deployment,
            stack=stack,
        )
        deployment.add_extra(
            ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
        )
LocalDockerOrchestratorConfig (BaseOrchestratorConfig, LocalDockerOrchestratorSettings) pydantic-model

Local Docker orchestrator config.

Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseOrchestratorConfig, LocalDockerOrchestratorSettings
):
    """Local Docker orchestrator config."""

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return True
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

LocalDockerOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the local Docker orchestrator.

Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the local Docker orchestrator."""

    @property
    def name(self) -> str:
        """Name of the orchestrator flavor.

        Returns:
            Name of the orchestrator flavor.
        """
        return "local_docker"

    @property
    def config_class(self) -> Type[BaseOrchestratorConfig]:
        """Config class for the base orchestrator flavor.

        Returns:
            The config class.
        """
        return LocalDockerOrchestratorConfig

    @property
    def implementation_class(self) -> Type["LocalDockerOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            Implementation class for this flavor.
        """
        return LocalDockerOrchestrator
config_class: Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig] property readonly

Config class for the base orchestrator flavor.

Returns:

Type Description
Type[zenml.orchestrators.base_orchestrator.BaseOrchestratorConfig]

The config class.

implementation_class: Type[LocalDockerOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[LocalDockerOrchestrator]

Implementation class for this flavor.

name: str property readonly

Name of the orchestrator flavor.

Returns:

Type Description
str

Name of the orchestrator flavor.

LocalDockerOrchestratorSettings (BaseSettings) pydantic-model

Local Docker orchestrator settings.

Attributes:

Name Type Description
run_args Dict[str, Any]

Arguments to pass to the docker run call.

Source code in zenml/orchestrators/local_docker/local_docker_orchestrator.py
class LocalDockerOrchestratorSettings(BaseSettings):
    """Local Docker orchestrator settings.

    Attributes:
        run_args: Arguments to pass to the `docker run` call.
    """

    run_args: Dict[str, Any] = {}

    @validator("run_args", pre=True)
    def _convert_json_string(
        cls, value: Union[None, str, Dict[str, Any]]
    ) -> Optional[Dict[str, Any]]:
        """Converts potential JSON strings passed via the CLI to dictionaries.

        Args:
            value: The value to convert.

        Returns:
            The converted value.

        Raises:
            TypeError: If the value is not a `str`, `Dict` or `None`.
            ValueError: If the value is an invalid json string or a json string
                that does not decode into a dictionary.
        """
        if isinstance(value, str):
            try:
                dict_ = json.loads(value)
            except json.JSONDecodeError as e:
                raise ValueError(f"Invalid json string '{value}'") from e

            if not isinstance(dict_, Dict):
                raise ValueError(
                    f"Json string '{value}' did not decode into a dictionary."
                )

            return dict_
        elif isinstance(value, Dict) or value is None:
            return value
        else:
            raise TypeError(f"{value} is not a json string or a dictionary.")

utils

Utility functions for the orchestrator.

get_cache_status(execution_info)

Returns whether a cached execution was used or not.

Parameters:

Name Type Description Default
execution_info Optional[tfx.orchestration.portable.data_types.ExecutionInfo]

The execution info.

required

Returns:

Type Description
bool

True if the execution was cached, False otherwise.

Source code in zenml/orchestrators/utils.py
def get_cache_status(
    execution_info: Optional[data_types.ExecutionInfo],
) -> bool:
    """Returns whether a cached execution was used or not.

    Args:
        execution_info: The execution info.

    Returns:
        `True` if the execution was cached, `False` otherwise.
    """
    # An execution output URI is only provided if the step needs to be
    # executed (= is not cached)
    if execution_info and execution_info.execution_output_uri is None:
        return True
    else:
        return False

get_orchestrator_run_name(pipeline_name)

Gets an orchestrator run name.

This run name is not the same as the ZenML run name but can instead be used to display in the orchestrator UI.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline that will run.

required

Returns:

Type Description
str

The orchestrator run name.

Source code in zenml/orchestrators/utils.py
def get_orchestrator_run_name(pipeline_name: str) -> str:
    """Gets an orchestrator run name.

    This run name is not the same as the ZenML run name but can instead be
    used to display in the orchestrator UI.

    Args:
        pipeline_name: Name of the pipeline that will run.

    Returns:
        The orchestrator run name.
    """
    user_name = Client().active_user.name
    return f"{pipeline_name}_{user_name}_{random.Random().getrandbits(32):08x}"