Skip to content

Pipelines

zenml.pipelines special

A ZenML pipeline consists of tasks that execute in order and yield artifacts.

The artifacts are automatically stored within the artifact store and metadata is tracked by ZenML. Each individual task within a pipeline is known as a step. The standard pipelines within ZenML are designed to have easy interfaces to add pre-decided steps, with the order also pre-decided. Other sorts of pipelines can be created as well from scratch, building on the BasePipeline class.

Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run, a pipeline is compiled and passed directly to the orchestrator.

base_pipeline

Legacy ZenML pipeline class definition.

BasePipeline (Pipeline, ABC)

Legacy pipeline class.

Source code in zenml/pipelines/base_pipeline.py
class BasePipeline(Pipeline, ABC):
    """Legacy pipeline class."""

    _CLASS_CONFIGURATION: ClassVar[Optional[Dict[str, Any]]] = None

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initializes a pipeline.

        Args:
            *args: Initialization arguments.
            **kwargs: Initialization keyword arguments.
        """
        config = self._CLASS_CONFIGURATION or {}
        pipeline_name = (
            config.pop(PARAM_PIPELINE_NAME, None) or self.__class__.__name__
        )
        self._steps = self._verify_steps(
            *args, __name__=pipeline_name, **kwargs
        )

        def entrypoint() -> None:
            self.connect(**self._steps)

        super().__init__(
            name=pipeline_name,
            entrypoint=entrypoint,
            **config,
        )

    @property
    def steps(self) -> Dict[str, BaseStep]:
        """Returns the steps of the pipeline.

        Returns:
            The steps of the pipeline.
        """
        return self._steps

    @abstractmethod
    def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
        """Abstract method that connects the pipeline steps.

        Args:
            *args: Connect method arguments.
            **kwargs: Connect method keyword arguments.
        """
        raise NotImplementedError

    def resolve(self) -> "Source":
        """Resolves the pipeline.

        Returns:
            The pipeline source.
        """
        return source_utils.resolve(self.__class__)

    @property
    def source_object(self) -> Any:
        """The source object of this pipeline.

        Returns:
            The source object of this pipeline.
        """
        return self.connect

    def run(
        self,
        *,
        run_name: Optional[str] = None,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        extra: Optional[Dict[str, Any]] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
        prevent_build_reuse: bool = False,
    ) -> None:
        """Runs the pipeline on the active stack.

        Args:
            run_name: Name of the pipeline run.
            enable_cache: If caching should be enabled for this pipeline run.
            enable_artifact_metadata: If artifact metadata should be enabled
                for this pipeline run.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline run.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            settings: Settings for this pipeline run.
            step_configurations: Configurations for steps of the pipeline.
            extra: Extra configurations for this pipeline run.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
            prevent_build_reuse: Whether to prevent the reuse of a build.
        """
        pipeline_copy = self.with_options(
            run_name=run_name,
            schedule=schedule,
            build=build,
            step_configurations=step_configurations,
            config_path=config_path,
            unlisted=unlisted,
            prevent_build_reuse=prevent_build_reuse,
        )
        new_run_args = dict_utils.remove_none_values(
            {
                "enable_cache": enable_cache,
                "enable_artifact_metadata": enable_artifact_metadata,
                "enable_artifact_visualization": enable_artifact_visualization,
                "settings": settings,
                "extra": extra,
            }
        )
        pipeline_copy._run_args.update(new_run_args)

        pipeline_copy()

    def _compute_invocation_id(
        self,
        step: "BaseStep",
        custom_id: Optional[str] = None,
        allow_suffix: bool = True,
    ) -> str:
        """Compute the invocation ID.

        Args:
            step: The step for which to compute the ID.
            custom_id: Custom ID to use for the invocation.
            allow_suffix: Whether a suffix can be appended to the invocation
                ID.

        Returns:
            The invocation ID.
        """
        custom_id = getattr(step, TEMPLATE_NAME_ATTRIBUTE, None)

        return super()._compute_invocation_id(
            step=step, custom_id=custom_id, allow_suffix=False
        )

    def _verify_steps(
        self, *args: Any, __name__: str, **kwargs: Any
    ) -> Dict[str, "BaseStep"]:
        """Verifies the initialization args and kwargs of this pipeline.

        This method makes sure that no missing/unexpected arguments or
        arguments of a wrong type are passed when creating a pipeline.

        Args:
            *args: The args passed to the init method of this pipeline.
            __name__: The pipeline name. The naming of this argument is to avoid
                conflicts with other arguments.
            **kwargs: The kwargs passed to the init method of this pipeline.

        Raises:
            PipelineInterfaceError: If there are too many/few arguments or
                arguments with a wrong name/type.

        Returns:
            The verified steps.
        """
        signature = inspect.signature(self.connect, follow_wrapped=True)

        try:
            bound_args = signature.bind(*args, **kwargs)
        except TypeError as e:
            raise PipelineInterfaceError(
                f"Wrong arguments when initializing pipeline '{__name__}': {e}"
            ) from e

        steps = {}

        for key, potential_step in bound_args.arguments.items():
            step_class = type(potential_step)

            if inspect.isclass(potential_step) and issubclass(
                potential_step, BaseStep
            ):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{__name__}'. "
                    f"A `BaseStep` subclass was provided instead of an "
                    f"instance. "
                    f"This might have been caused due to missing brackets of "
                    f"your steps when creating a pipeline with `@step` "
                    f"decorated functions, "
                    f"for which the correct syntax is `pipeline(step=step())`."
                )

            if not isinstance(potential_step, BaseStep):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{__name__}'. Only "
                    f"`@step` decorated functions or instances of `BaseStep` "
                    f"subclasses can be used as arguments when creating "
                    f"a pipeline."
                )

            steps[key] = potential_step
            setattr(potential_step, TEMPLATE_NAME_ATTRIBUTE, key)

        return steps
source_object: Any property readonly

The source object of this pipeline.

Returns:

Type Description
Any

The source object of this pipeline.

steps: Dict[str, zenml.steps.base_step.BaseStep] property readonly

Returns the steps of the pipeline.

Returns:

Type Description
Dict[str, zenml.steps.base_step.BaseStep]

The steps of the pipeline.

__init__(self, *args, **kwargs) special

Initializes a pipeline.

Parameters:

Name Type Description Default
*args Any

Initialization arguments.

()
**kwargs Any

Initialization keyword arguments.

{}
Source code in zenml/pipelines/base_pipeline.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initializes a pipeline.

    Args:
        *args: Initialization arguments.
        **kwargs: Initialization keyword arguments.
    """
    config = self._CLASS_CONFIGURATION or {}
    pipeline_name = (
        config.pop(PARAM_PIPELINE_NAME, None) or self.__class__.__name__
    )
    self._steps = self._verify_steps(
        *args, __name__=pipeline_name, **kwargs
    )

    def entrypoint() -> None:
        self.connect(**self._steps)

    super().__init__(
        name=pipeline_name,
        entrypoint=entrypoint,
        **config,
    )
connect(self, *args, **kwargs)

Abstract method that connects the pipeline steps.

Parameters:

Name Type Description Default
*args BaseStep

Connect method arguments.

()
**kwargs BaseStep

Connect method keyword arguments.

{}
Source code in zenml/pipelines/base_pipeline.py
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
    """Abstract method that connects the pipeline steps.

    Args:
        *args: Connect method arguments.
        **kwargs: Connect method keyword arguments.
    """
    raise NotImplementedError
resolve(self)

Resolves the pipeline.

Returns:

Type Description
Source

The pipeline source.

Source code in zenml/pipelines/base_pipeline.py
def resolve(self) -> "Source":
    """Resolves the pipeline.

    Returns:
        The pipeline source.
    """
    return source_utils.resolve(self.__class__)
run(self, *, run_name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, schedule=None, build=None, settings=None, step_configurations=None, extra=None, config_path=None, unlisted=False, prevent_build_reuse=False)

Runs the pipeline on the active stack.

Parameters:

Name Type Description Default
run_name Optional[str]

Name of the pipeline run.

None
enable_cache Optional[bool]

If caching should be enabled for this pipeline run.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this pipeline run.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this pipeline run.

None
schedule Optional[zenml.config.schedule.Schedule]

Optional schedule to use for the run.

None
build Union[str, UUID, PipelineBuildBaseModel]

Optional build to use for the run.

None
settings Optional[Mapping[str, SettingsOrDict]]

Settings for this pipeline run.

None
step_configurations Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline run.

None
config_path Optional[str]

Path to a yaml configuration file. This file will be parsed as a zenml.config.pipeline_configurations.PipelineRunConfiguration object. Options provided in this file will be overwritten by options provided in code using the other arguments of this method.

None
unlisted bool

Whether the pipeline run should be unlisted (not assigned to any pipeline).

False
prevent_build_reuse bool

Whether to prevent the reuse of a build.

False
Source code in zenml/pipelines/base_pipeline.py
def run(
    self,
    *,
    run_name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    schedule: Optional[Schedule] = None,
    build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    extra: Optional[Dict[str, Any]] = None,
    config_path: Optional[str] = None,
    unlisted: bool = False,
    prevent_build_reuse: bool = False,
) -> None:
    """Runs the pipeline on the active stack.

    Args:
        run_name: Name of the pipeline run.
        enable_cache: If caching should be enabled for this pipeline run.
        enable_artifact_metadata: If artifact metadata should be enabled
            for this pipeline run.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline run.
        schedule: Optional schedule to use for the run.
        build: Optional build to use for the run.
        settings: Settings for this pipeline run.
        step_configurations: Configurations for steps of the pipeline.
        extra: Extra configurations for this pipeline run.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
        unlisted: Whether the pipeline run should be unlisted (not assigned
            to any pipeline).
        prevent_build_reuse: Whether to prevent the reuse of a build.
    """
    pipeline_copy = self.with_options(
        run_name=run_name,
        schedule=schedule,
        build=build,
        step_configurations=step_configurations,
        config_path=config_path,
        unlisted=unlisted,
        prevent_build_reuse=prevent_build_reuse,
    )
    new_run_args = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "enable_artifact_visualization": enable_artifact_visualization,
            "settings": settings,
            "extra": extra,
        }
    )
    pipeline_copy._run_args.update(new_run_args)

    pipeline_copy()

pipeline_decorator

Legacy ZenML pipeline decorator definition.

pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, settings=None, extra=None, on_failure=None, on_success=None)

Outer decorator function for the creation of a ZenML pipeline.

Parameters:

Name Type Description Default
_func Optional[~F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Whether to use caching or not.

None
enable_artifact_metadata Optional[bool]

Whether to enable artifact metadata or not.

None
enable_artifact_visualization Optional[bool]

Whether to enable artifact visualization.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with three possible parameters, StepContext, BaseParameters, and BaseException, or a source path to a function of the same specifications (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with two possible parameters, StepContext and BaseParameters, or a source path to a function of the same specifications (e.g.module.my_function`).

None

Returns:

Type Description
Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]]

the inner decorator which creates the pipeline class based on the ZenML BasePipeline

Source code in zenml/pipelines/pipeline_decorator.py
def pipeline(
    _func: Optional[F] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
    """Outer decorator function for the creation of a ZenML pipeline.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        enable_artifact_visualization: Whether to enable artifact visualization.
        settings: Settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can be
            a function with three possible parameters,
            `StepContext`, `BaseParameters`, and `BaseException`,
            or a source path to a function of the same specifications
            (e.g. `module.my_function`).
        on_success: Callback function in event of failure of the step. Can be
            a function with two possible parameters, `StepContext` and
            `BaseParameters, or a source path to a function of the same specifications
            (e.g. `module.my_function`).

    Returns:
        the inner decorator which creates the pipeline class based on the
        ZenML BasePipeline
    """
    logger.warning(
        "The `@pipeline` decorator that you use to define your pipeline is "
        "deprecated. Check out our docs https://docs.zenml.io for "
        "information on how to define pipelines in a more intuitive and "
        "flexible way!"
    )

    def inner_decorator(func: F) -> Type[BasePipeline]:
        return type(
            name or func.__name__,
            (BasePipeline,),
            {
                PIPELINE_INNER_FUNC_NAME: staticmethod(func),  # type: ignore[arg-type]
                CLASS_CONFIGURATION: {
                    PARAM_PIPELINE_NAME: name,
                    PARAM_ENABLE_CACHE: enable_cache,
                    PARAM_ENABLE_ARTIFACT_METADATA: enable_artifact_metadata,
                    PARAM_ENABLE_ARTIFACT_VISUALIZATION: enable_artifact_visualization,
                    PARAM_SETTINGS: settings,
                    PARAM_EXTRA_OPTIONS: extra,
                    PARAM_ON_FAILURE: on_failure,
                    PARAM_ON_SUCCESS: on_success,
                },
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)