Skip to content

Pipelines

zenml.pipelines special

A ZenML pipeline consists of tasks that execute in order and yield artifacts.

The artifacts are stored within the artifact store and indexed via the metadata store. Each individual task within a pipeline is known as a step. The standard pipelines within ZenML are designed to have easy interfaces to add pre-decided steps, with the order also pre-decided. Other sorts of pipelines can be created as well from scratch, building on the BasePipeline class.

Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run, a pipeline is compiled and passed directly to the orchestrator.

base_pipeline

Abstract base class for all ZenML pipelines.

BasePipeline

Abstract base class for all ZenML pipelines.

Attributes:

Name Type Description
name

The name of this pipeline.

enable_cache

A boolean indicating if caching is enabled for this pipeline.

requirements_file

DEPRECATED: Optional path to a pip requirements file that contains all requirements to run the pipeline. (Use requirements instead.)

requirements

Optional list of (string) pip requirements to run the

required_integrations

Optional set of integrations that need to be installed for this pipeline to run.

Source code in zenml/pipelines/base_pipeline.py
class BasePipeline(metaclass=BasePipelineMeta):
    """Abstract base class for all ZenML pipelines.

    Attributes:
        name: The name of this pipeline.
        enable_cache: A boolean indicating if caching is enabled for this
            pipeline.
        requirements_file: DEPRECATED: Optional path to a pip requirements file
            that contains all requirements to run the pipeline. (Use
            `requirements` instead.)
        requirements: Optional list of (string) pip requirements to run the
        pipeline, or a string path to a requirements file.
        required_integrations: Optional set of integrations that need to be
            installed for this pipeline to run.
    """

    STEP_SPEC: ClassVar[Dict[str, Any]] = None  # type: ignore[assignment]

    INSTANCE_CONFIGURATION: Dict[Text, Any] = {}

    def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
        """Initialize the BasePipeline.

        Args:
            *args: The steps to be executed by this pipeline.
            **kwargs: The configuration for this pipeline.
        """
        kwargs.update(getattr(self, INSTANCE_CONFIGURATION))
        self.enable_cache = kwargs.pop(PARAM_ENABLE_CACHE, True)
        self.secrets = kwargs.pop(PARAM_SECRETS, [])

        self.docker_configuration = (
            kwargs.pop(PARAM_DOCKER_CONFIGURATION, None)
            or DockerConfiguration()
        )
        self._migrate_to_docker_config(kwargs)

        self.name = self.__class__.__name__
        self.__steps: Dict[str, BaseStep] = {}
        self._verify_arguments(*args, **kwargs)

    def _migrate_to_docker_config(self, kwargs: Dict[str, Any]) -> None:
        """Migrates legacy requirements to the new Docker configuration.

        Args:
            kwargs: Keyword arguments passed during pipeline initialization.
        """
        attributes = [
            (PARAM_REQUIREMENTS, "requirements"),
            (PARAM_REQUIRED_INTEGRATIONS, "required_integrations"),
            (PARAM_DOCKERIGNORE_FILE, "dockerignore"),
        ]
        updates = {}

        for kwarg_name, docker_config_attribute in attributes:
            value = kwargs.pop(kwarg_name, None)

            if value:
                logger.warning(
                    f"The `{kwarg_name}` parameter has been deprecated. Please "
                    "use the `docker_configuration` parameter instead."
                )

                if getattr(self.docker_configuration, docker_config_attribute):
                    logger.warning(
                        f"Parameter {kwarg_name} was defined on the pipeline "
                        "as well as in the docker configuration. Ignoring the "
                        "value defined on the pipeline."
                    )
                else:
                    # No value set on the docker config, migrate the old one
                    updates[docker_config_attribute] = value

        if updates:
            self.docker_configuration = self.docker_configuration.copy(
                update=updates
            )

    def _verify_arguments(self, *steps: BaseStep, **kw_steps: BaseStep) -> None:
        """Verifies the initialization args and kwargs of this pipeline.

        This method makes sure that no missing/unexpected arguments or
        arguments of a wrong type are passed when creating a pipeline. If
        all arguments are correct, saves the steps to `self.__steps`.

        Args:
            *steps: The args passed to the init method of this pipeline.
            **kw_steps: The kwargs passed to the init method of this pipeline.

        Raises:
            PipelineInterfaceError: If there are too many/few arguments or
                arguments with a wrong name/type.
        """
        input_step_keys = list(self.STEP_SPEC.keys())
        if len(steps) > len(input_step_keys):
            raise PipelineInterfaceError(
                f"Too many input steps for pipeline '{self.name}'. "
                f"This pipeline expects {len(input_step_keys)} step(s) "
                f"but got {len(steps) + len(kw_steps)}."
            )

        combined_steps = {}
        step_classes: Dict[Type[BaseStep], str] = {}

        def _verify_step(key: str, step: BaseStep) -> None:
            """Verifies a single step of the pipeline.

            Args:
                key: The key of the step.
                step: The step to verify.

            Raises:
                PipelineInterfaceError: If the step is not of the correct type
                    or is of the same class as another step.
            """
            step_class = type(step)

            if isinstance(step, BaseStepMeta):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{self.name}'. "
                    f"A `BaseStep` subclass was provided instead of an "
                    f"instance. "
                    f"This might have been caused due to missing brackets of "
                    f"your steps when creating a pipeline with `@step` "
                    f"decorated functions, "
                    f"for which the correct syntax is `pipeline(step=step())`."
                )

            if not isinstance(step, BaseStep):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{self.name}'. Only "
                    f"`@step` decorated functions or instances of `BaseStep` "
                    f"subclasses can be used as arguments when creating "
                    f"a pipeline."
                )

            if step_class in step_classes:
                previous_key = step_classes[step_class]
                raise PipelineInterfaceError(
                    f"Found multiple step objects of the same class "
                    f"(`{step_class}`) for arguments '{previous_key}' and "
                    f"'{key}' in pipeline '{self.name}'. Only one step object "
                    f"per class is allowed inside a ZenML pipeline. A possible "
                    f"solution is to use the "
                    f"{clone_step.__module__}.{clone_step.__name__} utility to "
                    f"create multiple copies of the same step."
                )

            step.pipeline_parameter_name = key
            combined_steps[key] = step
            step_classes[step_class] = key

        # verify args
        for i, step in enumerate(steps):
            key = input_step_keys[i]
            _verify_step(key, step)

        # verify kwargs
        for key, step in kw_steps.items():
            if key in combined_steps:
                # a step for this key was already set by
                # the positional input steps
                raise PipelineInterfaceError(
                    f"Unexpected keyword argument '{key}' for pipeline "
                    f"'{self.name}'. A step for this key was "
                    f"already passed as a positional argument."
                )
            _verify_step(key, step)

        # check if there are any missing or unexpected steps
        expected_steps = set(self.STEP_SPEC.keys())
        actual_steps = set(combined_steps.keys())
        missing_steps = expected_steps - actual_steps
        unexpected_steps = actual_steps - expected_steps

        if missing_steps:
            raise PipelineInterfaceError(
                f"Missing input step(s) for pipeline "
                f"'{self.name}': {missing_steps}."
            )

        if unexpected_steps:
            raise PipelineInterfaceError(
                f"Unexpected input step(s) for pipeline "
                f"'{self.name}': {unexpected_steps}. This pipeline "
                f"only requires the following steps: {expected_steps}."
            )

        self.__steps = combined_steps

    @abstractmethod
    def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
        """Function that connects inputs and outputs of the pipeline steps.

        Args:
            *args: The positional arguments passed to the pipeline.
            **kwargs: The keyword arguments passed to the pipeline.

        Raises:
            NotImplementedError: Always.
        """
        raise NotImplementedError

    @property
    def steps(self) -> Dict[str, BaseStep]:
        """Returns a dictionary of pipeline steps.

        Returns:
            A dictionary of pipeline steps.
        """
        return self.__steps

    @steps.setter
    def steps(self, steps: Dict[str, BaseStep]) -> NoReturn:
        """Setting the steps property is not allowed.

        Args:
            steps: The steps to set.

        Raises:
            PipelineInterfaceError: Always.
        """
        raise PipelineInterfaceError("Cannot set steps manually!")

    def validate_stack(self, stack: "Stack") -> None:
        """Validates if a stack is able to run this pipeline.

        Args:
            stack: The stack to validate.

        Raises:
            StackValidationError: If the step operator is not configured in the
                active stack.
        """
        available_step_operators = (
            {stack.step_operator.name} if stack.step_operator else set()
        )

        for step in self.steps.values():
            if (
                step.custom_step_operator
                and step.custom_step_operator not in available_step_operators
            ):
                raise StackValidationError(
                    f"Step '{step.name}' requires custom step operator "
                    f"'{step.custom_step_operator}' which is not configured in "
                    f"the active stack. Available step operators: "
                    f"{available_step_operators}."
                )

    def _reset_steps(self) -> None:
        """Reset step values from previous pipeline runs.

        This ensures a pipeline instance can be called more than once.
        """
        for step in self.steps.values():
            step._reset()

    def run(
        self,
        *,
        run_name: Optional[str] = None,
        schedule: Optional[Schedule] = None,
        **additional_parameters: Any,
    ) -> Any:
        """Runs the pipeline on the active stack of the current repository.

        Args:
            run_name: Name of the pipeline run.
            schedule: Optional schedule of the pipeline.
            additional_parameters: Additional parameters to pass to the
                pipeline.

        Returns:
            The result of the pipeline.
        """
        if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
            # An environment variable was set to stop the execution of
            # pipelines. This is done to prevent execution of module-level
            # pipeline.run() calls inside docker containers which should only
            # run a single step.
            logger.info(
                "Preventing execution of pipeline '%s'. If this is not "
                "intended behavior, make sure to unset the environment "
                "variable '%s'.",
                self.name,
                constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
            )
            return

        logger.info("Creating run for pipeline: `%s`", self.name)
        logger.info(
            f'Cache {"enabled" if self.enable_cache else "disabled"} for '
            f"pipeline `{self.name}`"
        )

        # Activating the built-in integrations through lazy loading
        from zenml.integrations.registry import integration_registry

        integration_registry.activate_integrations()

        if not Environment.in_notebook():
            # Path of the file where pipeline.run() was called. This is needed by
            # the airflow orchestrator so it knows which file to copy into the DAG
            # directory
            dag_filepath = io_utils.resolve_relative_path(
                inspect.currentframe().f_back.f_code.co_filename  # type: ignore[union-attr]
            )
            additional_parameters.setdefault("dag_filepath", dag_filepath)

        runtime_configuration = RuntimeConfiguration(
            run_name=run_name,
            schedule=schedule,
            **additional_parameters,
        )
        stack = Repository().active_stack

        stack_metadata = {
            component_type.value: component.FLAVOR
            for component_type, component in stack.components.items()
        }
        track_event(
            event=AnalyticsEvent.RUN_PIPELINE,
            metadata={
                "store_type": Repository().active_profile.store_type.value,
                **stack_metadata,
                "total_steps": len(self.steps),
                "schedule": bool(schedule),
            },
        )

        self._reset_steps()
        self.validate_stack(stack)

        # Prevent execution of nested pipelines which might lead to unexpected
        # behavior
        constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
        try:
            return_value = stack.deploy_pipeline(
                self, runtime_configuration=runtime_configuration
            )
        finally:
            constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False

        return return_value

    def with_config(
        self: T, config_file: str, overwrite_step_parameters: bool = False
    ) -> T:
        """Configures this pipeline using a yaml file.

        Args:
            config_file: Path to a yaml file which contains configuration
                options for running this pipeline. See
                https://docs.zenml.io/developer-guide/steps-and-pipelines/runtime-configuration#configuring-with-yaml-config-files
                for details regarding the specification of this file.
            overwrite_step_parameters: If set to `True`, values from the
                configuration file will overwrite configuration parameters
                passed in code.

        Returns:
            The pipeline object that this method was called on.
        """
        config_yaml = yaml_utils.read_yaml(config_file)

        if PipelineConfigurationKeys.STEPS in config_yaml:
            self._read_config_steps(
                config_yaml[PipelineConfigurationKeys.STEPS],
                overwrite=overwrite_step_parameters,
            )

        return self

    def _read_config_steps(
        self, steps: Dict[str, Dict[str, Any]], overwrite: bool = False
    ) -> None:
        """Reads and sets step parameters from a config file.

        Args:
            steps: Maps step names to dicts of parameter names and values.
            overwrite: If `True`, overwrite previously set step parameters.

        Raises:
            PipelineConfigurationError: If the configuration file contains
                invalid data.
            DuplicatedConfigurationError: If the configuration file contains
                duplicate step names.
        """
        for step_name, step_dict in steps.items():
            StepConfigurationKeys.key_check(step_dict)

            if step_name not in self.__steps:
                raise PipelineConfigurationError(
                    f"Found '{step_name}' step in configuration yaml but it "
                    f"doesn't exist in the pipeline steps "
                    f"{list(self.__steps.keys())}."
                )

            step = self.__steps[step_name]
            step_parameters = (
                step.CONFIG_CLASS.__fields__.keys() if step.CONFIG_CLASS else {}
            )
            parameters = step_dict.get(StepConfigurationKeys.PARAMETERS_, {})
            # pop the enable_cache
            if PARAM_ENABLE_CACHE in parameters:
                enable_cache = parameters.pop(PARAM_ENABLE_CACHE)
                self.steps[step_name].enable_cache = enable_cache

            for parameter, value in parameters.items():
                if parameter not in step_parameters:
                    raise PipelineConfigurationError(
                        f"Found parameter '{parameter}' for '{step_name}' step "
                        f"in configuration yaml but it doesn't exist in the "
                        f"configuration class `{step.CONFIG_CLASS}`. Available "
                        f"parameters for this step: "
                        f"{list(step_parameters)}."
                    )

                previous_value = step.PARAM_SPEC.get(parameter, None)

                if overwrite:
                    step.PARAM_SPEC[parameter] = value
                else:
                    step.PARAM_SPEC.setdefault(parameter, value)

                if overwrite or not previous_value:
                    logger.debug(
                        "Setting parameter %s=%s for step '%s'.",
                        parameter,
                        value,
                        step_name,
                    )
                if previous_value and not overwrite:
                    raise DuplicatedConfigurationError(
                        "The value for parameter '{}' is set twice for step "
                        "'{}' ({} vs. {}). This can happen when you "
                        "instantiate your step with a step configuration that "
                        "sets the parameter, while also setting the same "
                        "parameter within a config file that is added to the "
                        "pipeline instance using the `.with_config()` method. "
                        "Make sure each parameter is only defined **once**. \n"
                        "While it is not recommended, you can overwrite the "
                        "step configuration using the configuration file: \n"
                        "`.with_config('config.yaml', "
                        "overwrite_step_parameters=True)".format(
                            parameter, step_name, previous_value, value
                        )
                    )

    @classmethod
    def get_runs(cls) -> Optional[List["PipelineRunView"]]:
        """Get all past runs from the associated PipelineView.

        Returns:
            A list of all past PipelineRunViews.

        Raises:
            RuntimeError: In case the repository does not contain the view
                of the current pipeline.
        """
        pipeline_view = Repository().get_pipeline(cls)
        if pipeline_view:
            return pipeline_view.runs  # type: ignore[no-any-return]
        else:
            raise RuntimeError(
                f"The PipelineView for `{cls.__name__}` could "
                f"not be found. Are you sure this pipeline has "
                f"been run already?"
            )

    @classmethod
    def get_run(cls, run_name: str) -> Optional["PipelineRunView"]:
        """Get a specific past run from the associated PipelineView.

        Args:
            run_name: Name of the run

        Returns:
            The PipelineRunView of the specific pipeline run.

        Raises:
            RuntimeError: In case the repository does not contain the view
                of the current pipeline.
        """
        pipeline_view = Repository().get_pipeline(cls)
        if pipeline_view:
            return pipeline_view.get_run(run_name)  # type: ignore[no-any-return]
        else:
            raise RuntimeError(
                f"The PipelineView for `{cls.__name__}` could "
                f"not be found. Are you sure this pipeline has "
                f"been run already?"
            )
steps: Dict[str, zenml.steps.base_step.BaseStep] property writable

Returns a dictionary of pipeline steps.

Returns:

Type Description
Dict[str, zenml.steps.base_step.BaseStep]

A dictionary of pipeline steps.

__init__(self, *args, **kwargs) special

Initialize the BasePipeline.

Parameters:

Name Type Description Default
*args BaseStep

The steps to be executed by this pipeline.

()
**kwargs Any

The configuration for this pipeline.

{}
Source code in zenml/pipelines/base_pipeline.py
def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
    """Initialize the BasePipeline.

    Args:
        *args: The steps to be executed by this pipeline.
        **kwargs: The configuration for this pipeline.
    """
    kwargs.update(getattr(self, INSTANCE_CONFIGURATION))
    self.enable_cache = kwargs.pop(PARAM_ENABLE_CACHE, True)
    self.secrets = kwargs.pop(PARAM_SECRETS, [])

    self.docker_configuration = (
        kwargs.pop(PARAM_DOCKER_CONFIGURATION, None)
        or DockerConfiguration()
    )
    self._migrate_to_docker_config(kwargs)

    self.name = self.__class__.__name__
    self.__steps: Dict[str, BaseStep] = {}
    self._verify_arguments(*args, **kwargs)
connect(self, *args, **kwargs)

Function that connects inputs and outputs of the pipeline steps.

Parameters:

Name Type Description Default
*args BaseStep

The positional arguments passed to the pipeline.

()
**kwargs BaseStep

The keyword arguments passed to the pipeline.

{}

Exceptions:

Type Description
NotImplementedError

Always.

Source code in zenml/pipelines/base_pipeline.py
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
    """Function that connects inputs and outputs of the pipeline steps.

    Args:
        *args: The positional arguments passed to the pipeline.
        **kwargs: The keyword arguments passed to the pipeline.

    Raises:
        NotImplementedError: Always.
    """
    raise NotImplementedError
get_run(run_name) classmethod

Get a specific past run from the associated PipelineView.

Parameters:

Name Type Description Default
run_name str

Name of the run

required

Returns:

Type Description
Optional[PipelineRunView]

The PipelineRunView of the specific pipeline run.

Exceptions:

Type Description
RuntimeError

In case the repository does not contain the view of the current pipeline.

Source code in zenml/pipelines/base_pipeline.py
@classmethod
def get_run(cls, run_name: str) -> Optional["PipelineRunView"]:
    """Get a specific past run from the associated PipelineView.

    Args:
        run_name: Name of the run

    Returns:
        The PipelineRunView of the specific pipeline run.

    Raises:
        RuntimeError: In case the repository does not contain the view
            of the current pipeline.
    """
    pipeline_view = Repository().get_pipeline(cls)
    if pipeline_view:
        return pipeline_view.get_run(run_name)  # type: ignore[no-any-return]
    else:
        raise RuntimeError(
            f"The PipelineView for `{cls.__name__}` could "
            f"not be found. Are you sure this pipeline has "
            f"been run already?"
        )
get_runs() classmethod

Get all past runs from the associated PipelineView.

Returns:

Type Description
Optional[List[PipelineRunView]]

A list of all past PipelineRunViews.

Exceptions:

Type Description
RuntimeError

In case the repository does not contain the view of the current pipeline.

Source code in zenml/pipelines/base_pipeline.py
@classmethod
def get_runs(cls) -> Optional[List["PipelineRunView"]]:
    """Get all past runs from the associated PipelineView.

    Returns:
        A list of all past PipelineRunViews.

    Raises:
        RuntimeError: In case the repository does not contain the view
            of the current pipeline.
    """
    pipeline_view = Repository().get_pipeline(cls)
    if pipeline_view:
        return pipeline_view.runs  # type: ignore[no-any-return]
    else:
        raise RuntimeError(
            f"The PipelineView for `{cls.__name__}` could "
            f"not be found. Are you sure this pipeline has "
            f"been run already?"
        )
run(self, *, run_name=None, schedule=None, **additional_parameters)

Runs the pipeline on the active stack of the current repository.

Parameters:

Name Type Description Default
run_name Optional[str]

Name of the pipeline run.

None
schedule Optional[zenml.pipelines.schedule.Schedule]

Optional schedule of the pipeline.

None
additional_parameters Any

Additional parameters to pass to the pipeline.

{}

Returns:

Type Description
Any

The result of the pipeline.

Source code in zenml/pipelines/base_pipeline.py
def run(
    self,
    *,
    run_name: Optional[str] = None,
    schedule: Optional[Schedule] = None,
    **additional_parameters: Any,
) -> Any:
    """Runs the pipeline on the active stack of the current repository.

    Args:
        run_name: Name of the pipeline run.
        schedule: Optional schedule of the pipeline.
        additional_parameters: Additional parameters to pass to the
            pipeline.

    Returns:
        The result of the pipeline.
    """
    if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
        # An environment variable was set to stop the execution of
        # pipelines. This is done to prevent execution of module-level
        # pipeline.run() calls inside docker containers which should only
        # run a single step.
        logger.info(
            "Preventing execution of pipeline '%s'. If this is not "
            "intended behavior, make sure to unset the environment "
            "variable '%s'.",
            self.name,
            constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
        )
        return

    logger.info("Creating run for pipeline: `%s`", self.name)
    logger.info(
        f'Cache {"enabled" if self.enable_cache else "disabled"} for '
        f"pipeline `{self.name}`"
    )

    # Activating the built-in integrations through lazy loading
    from zenml.integrations.registry import integration_registry

    integration_registry.activate_integrations()

    if not Environment.in_notebook():
        # Path of the file where pipeline.run() was called. This is needed by
        # the airflow orchestrator so it knows which file to copy into the DAG
        # directory
        dag_filepath = io_utils.resolve_relative_path(
            inspect.currentframe().f_back.f_code.co_filename  # type: ignore[union-attr]
        )
        additional_parameters.setdefault("dag_filepath", dag_filepath)

    runtime_configuration = RuntimeConfiguration(
        run_name=run_name,
        schedule=schedule,
        **additional_parameters,
    )
    stack = Repository().active_stack

    stack_metadata = {
        component_type.value: component.FLAVOR
        for component_type, component in stack.components.items()
    }
    track_event(
        event=AnalyticsEvent.RUN_PIPELINE,
        metadata={
            "store_type": Repository().active_profile.store_type.value,
            **stack_metadata,
            "total_steps": len(self.steps),
            "schedule": bool(schedule),
        },
    )

    self._reset_steps()
    self.validate_stack(stack)

    # Prevent execution of nested pipelines which might lead to unexpected
    # behavior
    constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
    try:
        return_value = stack.deploy_pipeline(
            self, runtime_configuration=runtime_configuration
        )
    finally:
        constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False

    return return_value
validate_stack(self, stack)

Validates if a stack is able to run this pipeline.

Parameters:

Name Type Description Default
stack Stack

The stack to validate.

required

Exceptions:

Type Description
StackValidationError

If the step operator is not configured in the active stack.

Source code in zenml/pipelines/base_pipeline.py
def validate_stack(self, stack: "Stack") -> None:
    """Validates if a stack is able to run this pipeline.

    Args:
        stack: The stack to validate.

    Raises:
        StackValidationError: If the step operator is not configured in the
            active stack.
    """
    available_step_operators = (
        {stack.step_operator.name} if stack.step_operator else set()
    )

    for step in self.steps.values():
        if (
            step.custom_step_operator
            and step.custom_step_operator not in available_step_operators
        ):
            raise StackValidationError(
                f"Step '{step.name}' requires custom step operator "
                f"'{step.custom_step_operator}' which is not configured in "
                f"the active stack. Available step operators: "
                f"{available_step_operators}."
            )
with_config(self, config_file, overwrite_step_parameters=False)

Configures this pipeline using a yaml file.

Parameters:

Name Type Description Default
config_file str

Path to a yaml file which contains configuration options for running this pipeline. See https://docs.zenml.io/developer-guide/steps-and-pipelines/runtime-configuration#configuring-with-yaml-config-files for details regarding the specification of this file.

required
overwrite_step_parameters bool

If set to True, values from the configuration file will overwrite configuration parameters passed in code.

False

Returns:

Type Description
~T

The pipeline object that this method was called on.

Source code in zenml/pipelines/base_pipeline.py
def with_config(
    self: T, config_file: str, overwrite_step_parameters: bool = False
) -> T:
    """Configures this pipeline using a yaml file.

    Args:
        config_file: Path to a yaml file which contains configuration
            options for running this pipeline. See
            https://docs.zenml.io/developer-guide/steps-and-pipelines/runtime-configuration#configuring-with-yaml-config-files
            for details regarding the specification of this file.
        overwrite_step_parameters: If set to `True`, values from the
            configuration file will overwrite configuration parameters
            passed in code.

    Returns:
        The pipeline object that this method was called on.
    """
    config_yaml = yaml_utils.read_yaml(config_file)

    if PipelineConfigurationKeys.STEPS in config_yaml:
        self._read_config_steps(
            config_yaml[PipelineConfigurationKeys.STEPS],
            overwrite=overwrite_step_parameters,
        )

    return self

BasePipelineMeta (type)

Pipeline Metaclass responsible for validating the pipeline definition.

Source code in zenml/pipelines/base_pipeline.py
class BasePipelineMeta(type):
    """Pipeline Metaclass responsible for validating the pipeline definition."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BasePipelineMeta":
        """Saves argument names for later verification purposes.

        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The dictionary of the class.

        Returns:
            The class.
        """
        cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))

        cls.STEP_SPEC = {}

        connect_spec = inspect.getfullargspec(
            inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
        )
        connect_args = connect_spec.args

        if connect_args and connect_args[0] == "self":
            connect_args.pop(0)

        for arg in connect_args:
            arg_type = connect_spec.annotations.get(arg, None)
            cls.STEP_SPEC.update({arg: arg_type})
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Saves argument names for later verification purposes.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases Tuple[Type[Any], ...]

The base classes of the class.

required
dct Dict[str, Any]

The dictionary of the class.

required

Returns:

Type Description
BasePipelineMeta

The class.

Source code in zenml/pipelines/base_pipeline.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BasePipelineMeta":
    """Saves argument names for later verification purposes.

    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The dictionary of the class.

    Returns:
        The class.
    """
    cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))

    cls.STEP_SPEC = {}

    connect_spec = inspect.getfullargspec(
        inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
    )
    connect_args = connect_spec.args

    if connect_args and connect_args[0] == "self":
        connect_args.pop(0)

    for arg in connect_args:
        arg_type = connect_spec.annotations.get(arg, None)
        cls.STEP_SPEC.update({arg: arg_type})
    return cls

pipeline_decorator

Decorator function for ZenML pipelines.

pipeline(_func=None, *, name=None, enable_cache=True, required_integrations=(), requirements=None, dockerignore_file=None, docker_configuration=None, secrets=[])

Outer decorator function for the creation of a ZenML pipeline.

In order to be able to work with parameters such as "name", it features a nested decorator structure.

Parameters:

Name Type Description Default
_func Optional[~F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache bool

Whether to use caching or not.

True
required_integrations Sequence[str]

DEPRECATED: Optional list of ZenML integrations that are required to run this pipeline. Please use docker_configuration instead.

()
requirements Union[str, List[str]]

DEPRECATED: Optional path to a requirements file or a list of requirements. Please use docker_configuration instead.

None
dockerignore_file Optional[str]

DEPRECATED: Optional path to a dockerignore file to use when building docker images for running this pipeline. Please use docker_configuration instead.

None
docker_configuration Optional[zenml.config.docker_configuration.DockerConfiguration]

Configuration of all Docker options.

None
secrets Optional[List[str]]

Optional list of secrets that are required to run this pipeline.

[]

Returns:

Type Description
Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]]

the inner decorator which creates the pipeline class based on the ZenML BasePipeline

Source code in zenml/pipelines/pipeline_decorator.py
def pipeline(
    _func: Optional[F] = None,
    *,
    name: Optional[str] = None,
    enable_cache: bool = True,
    required_integrations: Sequence[str] = (),
    requirements: Optional[Union[str, List[str]]] = None,
    dockerignore_file: Optional[str] = None,
    docker_configuration: Optional[DockerConfiguration] = None,
    secrets: Optional[List[str]] = [],
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
    """Outer decorator function for the creation of a ZenML pipeline.

    In order to be able to work with parameters such as "name", it features a
    nested decorator structure.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        required_integrations: DEPRECATED: Optional list of ZenML integrations
            that are required to run this pipeline. Please use
            `docker_configuration` instead.
        requirements: DEPRECATED: Optional path to a requirements file or a
            list of requirements. Please use `docker_configuration` instead.
        dockerignore_file: DEPRECATED: Optional path to a dockerignore file to
            use when building docker images for running this pipeline. Please
            use `docker_configuration` instead.
        docker_configuration: Configuration of all Docker options.
        secrets: Optional list of secrets that are required to run this pipeline.

    Returns:
        the inner decorator which creates the pipeline class based on the
        ZenML BasePipeline
    """

    def inner_decorator(func: F) -> Type[BasePipeline]:
        """Inner decorator function for the creation of a ZenML pipeline.

        Args:
            func: types.FunctionType, this function will be used as the
                "connect" method of the generated Pipeline

        Returns:
            the class of a newly generated ZenML Pipeline
        """
        return type(  # noqa
            name if name else func.__name__,
            (BasePipeline,),
            {
                PIPELINE_INNER_FUNC_NAME: staticmethod(func),  # type: ignore[arg-type] # noqa
                INSTANCE_CONFIGURATION: {
                    PARAM_ENABLE_CACHE: enable_cache,
                    PARAM_REQUIRED_INTEGRATIONS: required_integrations,
                    PARAM_REQUIREMENTS: requirements,
                    PARAM_DOCKERIGNORE_FILE: dockerignore_file,
                    PARAM_SECRETS: secrets,
                    PARAM_DOCKER_CONFIGURATION: docker_configuration,
                },
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)

run_pipeline

Running ZenML Pipelines from Code.

run_pipeline(python_file, config_path)

Runs pipeline specified by the given config YAML object.

Parameters:

Name Type Description Default
python_file str

Path to the python file that defines the pipeline.

required
config_path str

Path to configuration YAML file.

required

Exceptions:

Type Description
PipelineConfigurationError

Error when pipeline configuration is faulty.

RuntimeError

Error when zenml repository is not found.

Source code in zenml/pipelines/run_pipeline.py
def run_pipeline(python_file: str, config_path: str) -> None:
    """Runs pipeline specified by the given config YAML object.

    Args:
        python_file: Path to the python file that defines the pipeline.
        config_path: Path to configuration YAML file.

    Raises:
        PipelineConfigurationError: Error when pipeline configuration is faulty.
        RuntimeError: Error when zenml repository is not found.
    """
    # If the file was run with `python run.py, this would happen automatically.
    #  In order to allow seamless switching between running directly and through
    #  zenml, this is done at this point
    zenml_root = Repository().root
    if not zenml_root:
        raise RuntimeError(
            "The `run_pipeline` function can only be called "
            "within a zenml repo. Run `zenml init` before "
            "running a pipeline using `run_pipeline`."
        )

    module = source_utils.import_python_file(python_file, str(zenml_root))
    config = yaml_utils.read_yaml(config_path)
    PipelineConfigurationKeys.key_check(config)

    pipeline_name = config[PipelineConfigurationKeys.NAME]
    pipeline_class = _get_module_attribute(module, pipeline_name)

    # For docker-based orchestrators it is important for the supplied python
    #  module to be set as the main module instead of the calling process
    constants.USER_MAIN_MODULE = source_utils.get_module_source_from_module(
        module=module
    )

    steps = {}
    for step_name, step_config in config[
        PipelineConfigurationKeys.STEPS
    ].items():
        StepConfigurationKeys.key_check(step_config)
        source = step_config[StepConfigurationKeys.SOURCE_]
        step_class = _load_class_from_module(module, source, str(zenml_root))

        # It is necessary to support passing step instances for standard
        #  step implementations (e.g WhylogsProfilerStep) in order to
        #  support using the same step multiple  times, once this problem is
        #  solved, this portion of the code can be simplified to only
        #  support classes.
        if not isinstance(step_class, BaseStep):
            step_instance = step_class()
        else:
            step_instance = step_class

        materializers_config = step_config.get(
            StepConfigurationKeys.MATERIALIZERS_, None
        )
        if materializers_config:
            # We need to differentiate whether it's a single materializer
            # or a dictionary mapping output names to materializers
            if isinstance(materializers_config, str):
                correct_input = textwrap.dedent(
                    f"""
                {SourceConfigurationKeys.NAME_}: {materializers_config}
                {SourceConfigurationKeys.FILE_}: optional/filepath.py
                """
                )

                raise PipelineConfigurationError(
                    "As of ZenML version 0.8.0 `str` entries are no "
                    "longer supported "
                    "to define steps or materializers. Instead you will "
                    "now need to "
                    "pass a dictionary. This dictionary **has to** "
                    "contain a "
                    f"`{SourceConfigurationKeys.NAME_}` which refers to "
                    f"the function/"
                    "class name. If this entity is defined outside the "
                    "main module,"
                    "you will need to additionally supply a "
                    f"{SourceConfigurationKeys.FILE_} with the relative "
                    f"forward-slash-"
                    "separated path to the file. \n"
                    f"You tried to pass in `{materializers_config}` "
                    f"- however you should have specified the name "
                    f"(and file) like this: \n "
                    f"{correct_input}"
                )
            elif isinstance(materializers_config, dict):
                materializers = {
                    output_name: _load_class_from_module(
                        module, source, str(zenml_root)
                    )
                    for output_name, source in materializers_config.items()
                }
            else:
                raise PipelineConfigurationError(
                    f"Only `dict` values are allowed for "
                    f"'materializers' attribute of a step configuration. "
                    f"You tried to pass in `{materializers_config}` (type: "
                    f"`{type(materializers_config).__name__}`)."
                )
            step_instance = step_instance.with_return_materializers(
                materializers
            )

        steps[step_name] = step_instance
    pipeline_instance = pipeline_class(**steps).with_config(
        config_path, overwrite_step_parameters=True
    )
    logger.debug("Finished setting up pipeline '%s' from CLI", pipeline_name)
    pipeline_instance.run()

schedule

Class for defining a pipeline schedule.

Schedule (BaseModel) pydantic-model

Class for defining a pipeline schedule.

Attributes:

Name Type Description
cron_expression Optional[str]

Cron expression for the pipeline schedule. If a value for this is set it takes precedence over the start time + interval.

start_time Optional[datetime.datetime]

datetime object to indicate when to start the schedule.

end_time Optional[datetime.datetime]

datetime object to indicate when to end the schedule.

interval_second Optional[datetime.timedelta]

datetime timedelta indicating the seconds between two recurring runs for a periodic schedule.

catchup bool

Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill.

Source code in zenml/pipelines/schedule.py
class Schedule(BaseModel):
    """Class for defining a pipeline schedule.

    Attributes:
        cron_expression: Cron expression for the pipeline schedule. If a value
            for this is set it takes precedence over the start time + interval.
        start_time: datetime object to indicate when to start the schedule.
        end_time: datetime object to indicate when to end the schedule.
        interval_second: datetime timedelta indicating the seconds between two
            recurring runs for a periodic schedule.
        catchup: Whether the recurring run should catch up if behind schedule.
            For example, if the recurring run is paused for a while and
            re-enabled afterwards. If catchup=True, the scheduler will catch
            up on (backfill) each missed interval. Otherwise, it only
            schedules the latest interval if more than one interval is ready to
            be scheduled. Usually, if your pipeline handles backfill
            internally, you should turn catchup off to avoid duplicate backfill.
    """

    cron_expression: Optional[str] = None
    start_time: Optional[datetime.datetime] = None
    end_time: Optional[datetime.datetime] = None
    interval_second: Optional[datetime.timedelta] = None
    catchup: bool = False

    @root_validator
    def _ensure_cron_or_periodic_schedule_configured(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Ensures that the cron expression or start time + interval are set.

        Args:
            values: All attributes of the schedule.

        Returns:
            All schedule attributes.

        Raises:
            ValueError: If no cron expression or start time + interval were
                provided.
        """
        cron_expression = values.get("cron_expression")
        periodic_schedule = values.get("start_time") and values.get(
            "interval_second"
        )

        if cron_expression and periodic_schedule:
            logger.warning(
                "This schedule was created with a cron expression as well as "
                "values for `start_time` and `interval_seconds`. The resulting "
                "behaviour depends on the concrete orchestrator implementation "
                "but will usually ignore the interval and use the cron "
                "expression."
            )
            return values
        elif cron_expression or periodic_schedule:
            return values
        else:
            raise ValueError(
                "Either a cron expression or start time and interval seconds "
                "need to be set for a valid schedule."
            )

    @property
    def utc_start_time(self) -> Optional[str]:
        """Optional ISO-formatted string of the UTC start time.

        Returns:
            Optional ISO-formatted string of the UTC start time.
        """
        if not self.start_time:
            return None

        return self.start_time.astimezone(datetime.timezone.utc).isoformat()

    @property
    def utc_end_time(self) -> Optional[str]:
        """Optional ISO-formatted string of the UTC end time.

        Returns:
            Optional ISO-formatted string of the UTC end time.
        """
        if not self.end_time:
            return None

        return self.end_time.astimezone(datetime.timezone.utc).isoformat()
utc_end_time: Optional[str] property readonly

Optional ISO-formatted string of the UTC end time.

Returns:

Type Description
Optional[str]

Optional ISO-formatted string of the UTC end time.

utc_start_time: Optional[str] property readonly

Optional ISO-formatted string of the UTC start time.

Returns:

Type Description
Optional[str]

Optional ISO-formatted string of the UTC start time.