Skip to content

Pipelines

zenml.pipelines special

A ZenML pipeline consists of tasks that execute in order and yield artifacts.

The artifacts are stored within the artifact store and indexed via the metadata store. Each individual task within a pipeline is known as a step. The standard pipelines within ZenML are designed to have easy interfaces to add pre-decided steps, with the order also pre-decided. Other sorts of pipelines can be created as well from scratch, building on the BasePipeline class.

Pipelines can be written as simple functions. They are created by using decorators appropriate to the specific use case you have. The moment it is run, a pipeline is compiled and passed directly to the orchestrator.

base_pipeline

Abstract base class for all ZenML pipelines.

BasePipeline

Abstract base class for all ZenML pipelines.

Attributes:

Name Type Description
name

The name of this pipeline.

enable_cache

A boolean indicating if caching is enabled for this pipeline.

requirements_file

DEPRECATED: Optional path to a pip requirements file that contains all requirements to run the pipeline. (Use requirements instead.)

requirements

Optional list of (string) pip requirements to run the pipeline, or a string path to a requirements file.

required_integrations

Optional set of integrations that need to be installed for this pipeline to run.

Source code in zenml/pipelines/base_pipeline.py
class BasePipeline(metaclass=BasePipelineMeta):
    """Abstract base class for all ZenML pipelines.

    Attributes:
        name: The name of this pipeline.
        enable_cache: A boolean indicating if caching is enabled for this
            pipeline.
        requirements_file: DEPRECATED: Optional path to a pip requirements file
            that contains all requirements to run the pipeline. (Use
            `requirements` instead.)
        requirements: Optional list of (string) pip requirements to run the pipeline, or a string path to a requirements file.
        required_integrations: Optional set of integrations that need to be
            installed for this pipeline to run.
    """

    STEP_SPEC: ClassVar[Dict[str, Any]] = None  # type: ignore[assignment]

    INSTANCE_CONFIGURATION: Dict[Text, Any] = {}

    def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
        """Initialize the BasePipeline.

        Args:
            *args: The steps to be executed by this pipeline.
            **kwargs: The configuration for this pipeline.
        """
        kwargs.update(getattr(self, INSTANCE_CONFIGURATION))
        self.enable_cache = kwargs.pop(PARAM_ENABLE_CACHE, True)
        self.required_integrations = kwargs.pop(PARAM_REQUIRED_INTEGRATIONS, ())
        self.requirements_file = kwargs.pop(PARAM_REQUIREMENTS_FILE, None)
        if self.requirements_file:
            logger.warning(
                "The `requirements_file` argument has been deprecated. Please use `requirements` instead to pass in either a string path to a file listing your 'requirements' or a list of the individual requirements."
            )
        self._requirements = kwargs.pop(PARAM_REQUIREMENTS, None)
        self.dockerignore_file = kwargs.pop(PARAM_DOCKERIGNORE_FILE, None)
        self.secrets = kwargs.pop(PARAM_SECRETS, [])

        self.name = self.__class__.__name__
        self.__steps: Dict[str, BaseStep] = {}
        self._verify_arguments(*args, **kwargs)

    def _verify_arguments(self, *steps: BaseStep, **kw_steps: BaseStep) -> None:
        """Verifies the initialization args and kwargs of this pipeline.

        This method makes sure that no missing/unexpected arguments or
        arguments of a wrong type are passed when creating a pipeline. If
        all arguments are correct, saves the steps to `self.__steps`.

        Args:
            *steps: The args passed to the init method of this pipeline.
            **kw_steps: The kwargs passed to the init method of this pipeline.

        Raises:
            PipelineInterfaceError: If there are too many/few arguments or
                arguments with a wrong name/type.
        """
        input_step_keys = list(self.STEP_SPEC.keys())
        if len(steps) > len(input_step_keys):
            raise PipelineInterfaceError(
                f"Too many input steps for pipeline '{self.name}'. "
                f"This pipeline expects {len(input_step_keys)} step(s) "
                f"but got {len(steps) + len(kw_steps)}."
            )

        combined_steps = {}
        step_classes: Dict[Type[BaseStep], str] = {}

        def _verify_step(key: str, step: BaseStep) -> None:
            """Verifies a single step of the pipeline.

            Args:
                key: The key of the step.
                step: The step to verify.

            Raises:
                PipelineInterfaceError: If the step is not of the correct type
                    or is of the same class as another step.
            """
            step_class = type(step)

            if isinstance(step, BaseStepMeta):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{self.name}'. "
                    f"A `BaseStep` subclass was provided instead of an "
                    f"instance. "
                    f"This might have been caused due to missing brackets of "
                    f"your steps when creating a pipeline with `@step` "
                    f"decorated functions, "
                    f"for which the correct syntax is `pipeline(step=step())`."
                )

            if not isinstance(step, BaseStep):
                raise PipelineInterfaceError(
                    f"Wrong argument type (`{step_class}`) for argument "
                    f"'{key}' of pipeline '{self.name}'. Only "
                    f"`@step` decorated functions or instances of `BaseStep` "
                    f"subclasses can be used as arguments when creating "
                    f"a pipeline."
                )

            if step_class in step_classes:
                previous_key = step_classes[step_class]
                raise PipelineInterfaceError(
                    f"Found multiple step objects of the same class "
                    f"(`{step_class}`) for arguments '{previous_key}' and "
                    f"'{key}' in pipeline '{self.name}'. Only one step object "
                    f"per class is allowed inside a ZenML pipeline. A possible "
                    f"solution is to use the "
                    f"{clone_step.__module__}.{clone_step.__name__} utility to "
                    f"create multiple copies of the same step."
                )

            step.pipeline_parameter_name = key
            combined_steps[key] = step
            step_classes[step_class] = key

        # verify args
        for i, step in enumerate(steps):
            key = input_step_keys[i]
            _verify_step(key, step)

        # verify kwargs
        for key, step in kw_steps.items():
            if key in combined_steps:
                # a step for this key was already set by
                # the positional input steps
                raise PipelineInterfaceError(
                    f"Unexpected keyword argument '{key}' for pipeline "
                    f"'{self.name}'. A step for this key was "
                    f"already passed as a positional argument."
                )
            _verify_step(key, step)

        # check if there are any missing or unexpected steps
        expected_steps = set(self.STEP_SPEC.keys())
        actual_steps = set(combined_steps.keys())
        missing_steps = expected_steps - actual_steps
        unexpected_steps = actual_steps - expected_steps

        if missing_steps:
            raise PipelineInterfaceError(
                f"Missing input step(s) for pipeline "
                f"'{self.name}': {missing_steps}."
            )

        if unexpected_steps:
            raise PipelineInterfaceError(
                f"Unexpected input step(s) for pipeline "
                f"'{self.name}': {unexpected_steps}. This pipeline "
                f"only requires the following steps: {expected_steps}."
            )

        self.__steps = combined_steps

    @abstractmethod
    def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
        """Function that connects inputs and outputs of the pipeline steps.

        Args:
            *args: The positional arguments passed to the pipeline.
            **kwargs: The keyword arguments passed to the pipeline.

        Raises:
            NotImplementedError: Always.
        """
        raise NotImplementedError

    @property
    def requirements(self) -> Set[str]:
        """Set of Python requirements of this pipeline.

        This property is a combination of the requirements of
        - required integrations for this pipeline
        - the `requirements` specified for this pipeline

        Returns:
            Set of Python requirements of this pipeline.

        Raises:
            KeyError: If the requirements file could not be found.
        """
        requirements = set()

        for integration_name in self.required_integrations:
            try:
                integration_requirements = (
                    integration_registry.select_integration_requirements(
                        integration_name
                    )
                )
                requirements.update(integration_requirements)
            except KeyError as e:
                raise KeyError(
                    f"Unable to find requirements for integration "
                    f"'{integration_name}'."
                ) from e

        if isinstance(self._requirements, str) and fileio.exists(
            self._requirements
        ):
            with fileio.open(self._requirements, "r") as f:
                requirements.update(
                    {
                        requirement.strip()
                        for requirement in f.read().split("\n")
                    }
                )
            if self.requirements_file:
                cli_utils.warning(
                    f"Using the file path passed in as `requirements` and ignoring the file '{self.requirements_file}'."
                )
        # Add this logic back in (described in #ENG-882)
        #
        # elif isinstance(self._requirements, str) and self._requirements:
        #     root = str(Repository().root)
        #     if root:
        #         assumed_requirements_file = os.path.join(
        #             root, "requirements.txt"
        #         )
        #         if fileio.exists(assumed_requirements_file):
        #             with fileio.open(assumed_requirements_file, "r") as f:
        #                 requirements.update(
        #                     {
        #                         requirement.strip()
        #                         for requirement in f.read().split("\n")
        #                     }
        #                 )
        #                 logger.info(
        #                     "Using requirements file: `%s`",
        #                     assumed_requirements_file,
        #                 )
        elif isinstance(self._requirements, List):
            requirements.update(self._requirements)
            if self.requirements_file:
                cli_utils.warning(
                    f"Using the values passed in as `requirements` and ignoring the file '{self.requirements_file}'."
                )
        elif self.requirements_file and fileio.exists(self.requirements_file):
            # TODO [ENG-883]: Deprecate the `requirements_file` option
            with fileio.open(self.requirements_file, "r") as f:
                requirements.update(
                    {
                        requirement.strip()
                        for requirement in f.read().split("\n")
                    }
                )

        return requirements

    @property
    def steps(self) -> Dict[str, BaseStep]:
        """Returns a dictionary of pipeline steps.

        Returns:
            A dictionary of pipeline steps.
        """
        return self.__steps

    @steps.setter
    def steps(self, steps: Dict[str, BaseStep]) -> NoReturn:
        """Setting the steps property is not allowed.

        Args:
            steps: The steps to set.

        Raises:
            PipelineInterfaceError: Always.
        """
        raise PipelineInterfaceError("Cannot set steps manually!")

    def validate_stack(self, stack: "Stack") -> None:
        """Validates if a stack is able to run this pipeline.

        Args:
            stack: The stack to validate.

        Raises:
            StackValidationError: If the step operator is not configured in the
                active stack.
        """
        available_step_operators = (
            {stack.step_operator.name} if stack.step_operator else set()
        )

        for step in self.steps.values():
            if (
                step.custom_step_operator
                and step.custom_step_operator not in available_step_operators
            ):
                raise StackValidationError(
                    f"Step '{step.name}' requires custom step operator "
                    f"'{step.custom_step_operator}' which is not configured in "
                    f"the active stack. Available step operators: "
                    f"{available_step_operators}."
                )

    def _reset_step_flags(self) -> None:
        """Reset the `_has_been_called` flag at the beginning of a pipeline run.

        This ensures a pipeline instance can be called more than once.
        """
        for step in self.steps.values():
            step._has_been_called = False

    def run(
        self,
        *,
        run_name: Optional[str] = None,
        schedule: Optional[Schedule] = None,
        **additional_parameters: Any,
    ) -> Any:
        """Runs the pipeline on the active stack of the current repository.

        Args:
            run_name: Name of the pipeline run.
            schedule: Optional schedule of the pipeline.
            additional_parameters: Additional parameters to pass to the
                pipeline.

        Returns:
            The result of the pipeline.
        """
        if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
            # An environment variable was set to stop the execution of
            # pipelines. This is done to prevent execution of module-level
            # pipeline.run() calls inside docker containers which should only
            # run a single step.
            logger.info(
                "Preventing execution of pipeline '%s'. If this is not "
                "intended behavior, make sure to unset the environment "
                "variable '%s'.",
                self.name,
                constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
            )
            return

        logger.info("Creating run for pipeline: `%s`", self.name)
        logger.info(
            f'Cache {"enabled" if self.enable_cache else "disabled"} for '
            f"pipeline `{self.name}`"
        )

        # Activating the built-in integrations through lazy loading
        from zenml.integrations.registry import integration_registry

        integration_registry.activate_integrations()

        # Path of the file where pipeline.run() was called. This is needed by
        # the airflow orchestrator so it knows which file to copy into the DAG
        # directory
        dag_filepath = io_utils.resolve_relative_path(
            inspect.currentframe().f_back.f_code.co_filename  # type: ignore[union-attr]
        )
        runtime_configuration = RuntimeConfiguration(
            run_name=run_name,
            dag_filepath=dag_filepath,
            schedule=schedule,
            **additional_parameters,
        )
        stack = Repository().active_stack

        stack_metadata = {
            component_type.value: component.FLAVOR
            for component_type, component in stack.components.items()
        }
        track_event(
            event=AnalyticsEvent.RUN_PIPELINE,
            metadata={
                "store_type": Repository().active_profile.store_type.value,
                **stack_metadata,
                "total_steps": len(self.steps),
                "schedule": bool(schedule),
            },
        )

        self._reset_step_flags()
        self.validate_stack(stack)

        return stack.deploy_pipeline(
            self, runtime_configuration=runtime_configuration
        )

    def with_config(
        self: T, config_file: str, overwrite_step_parameters: bool = False
    ) -> T:
        """Configures this pipeline using a yaml file.

        Args:
            config_file: Path to a yaml file which contains configuration
                options for running this pipeline. See
                https://docs.zenml.io/features/pipeline-configuration#setting
                -step-parameters-using-a-config-file
                for details regarding the specification of this file.
            overwrite_step_parameters: If set to `True`, values from the
                configuration file will overwrite configuration parameters
                passed in code.

        Returns:
            The pipeline object that this method was called on.
        """
        config_yaml = yaml_utils.read_yaml(config_file)

        if PipelineConfigurationKeys.STEPS in config_yaml:
            self._read_config_steps(
                config_yaml[PipelineConfigurationKeys.STEPS],
                overwrite=overwrite_step_parameters,
            )

        return self

    def _read_config_steps(
        self, steps: Dict[str, Dict[str, Any]], overwrite: bool = False
    ) -> None:
        """Reads and sets step parameters from a config file.

        Args:
            steps: Maps step names to dicts of parameter names and values.
            overwrite: If `True`, overwrite previously set step parameters.

        Raises:
            PipelineConfigurationError: If the configuration file contains
                invalid data.
            DuplicatedConfigurationError: If the configuration file contains
                duplicate step names.
        """
        for step_name, step_dict in steps.items():
            StepConfigurationKeys.key_check(step_dict)

            if step_name not in self.__steps:
                raise PipelineConfigurationError(
                    f"Found '{step_name}' step in configuration yaml but it "
                    f"doesn't exist in the pipeline steps "
                    f"{list(self.__steps.keys())}."
                )

            step = self.__steps[step_name]
            step_parameters = (
                step.CONFIG_CLASS.__fields__.keys() if step.CONFIG_CLASS else {}
            )
            parameters = step_dict.get(StepConfigurationKeys.PARAMETERS_, {})
            for parameter, value in parameters.items():
                if parameter not in step_parameters:
                    raise PipelineConfigurationError(
                        f"Found parameter '{parameter}' for '{step_name}' step "
                        f"in configuration yaml but it doesn't exist in the "
                        f"configuration class `{step.CONFIG_CLASS}`. Available "
                        f"parameters for this step: "
                        f"{list(step_parameters)}."
                    )

                previous_value = step.PARAM_SPEC.get(parameter, None)

                if overwrite:
                    step.PARAM_SPEC[parameter] = value
                else:
                    step.PARAM_SPEC.setdefault(parameter, value)

                if overwrite or not previous_value:
                    logger.debug(
                        "Setting parameter %s=%s for step '%s'.",
                        parameter,
                        value,
                        step_name,
                    )
                if previous_value and not overwrite:
                    raise DuplicatedConfigurationError(
                        "The value for parameter '{}' is set twice for step "
                        "'{}' ({} vs. {}). This can happen when you "
                        "instantiate your step with a step configuration that "
                        "sets the parameter, while also setting the same "
                        "parameter within a config file that is added to the "
                        "pipeline instance using the `.with_config()` method. "
                        "Make sure each parameter is only defined **once**. \n"
                        "While it is not recommended, you can overwrite the "
                        "step configuration using the configuration file: \n"
                        "`.with_config('config.yaml', "
                        "overwrite_step_parameters=True)".format(
                            parameter, step_name, previous_value, value
                        )
                    )
requirements: Set[str] property readonly

Set of Python requirements of this pipeline.

This property is a combination of the requirements of - required integrations for this pipeline - the requirements specified for this pipeline

Returns:

Type Description
Set[str]

Set of Python requirements of this pipeline.

Exceptions:

Type Description
KeyError

If the requirements file could not be found.

steps: Dict[str, zenml.steps.base_step.BaseStep] property writable

Returns a dictionary of pipeline steps.

Returns:

Type Description
Dict[str, zenml.steps.base_step.BaseStep]

A dictionary of pipeline steps.

__init__(self, *args, **kwargs) special

Initialize the BasePipeline.

Parameters:

Name Type Description Default
*args BaseStep

The steps to be executed by this pipeline.

()
**kwargs Any

The configuration for this pipeline.

{}
Source code in zenml/pipelines/base_pipeline.py
def __init__(self, *args: BaseStep, **kwargs: Any) -> None:
    """Initialize the BasePipeline.

    Args:
        *args: The steps to be executed by this pipeline.
        **kwargs: The configuration for this pipeline.
    """
    kwargs.update(getattr(self, INSTANCE_CONFIGURATION))
    self.enable_cache = kwargs.pop(PARAM_ENABLE_CACHE, True)
    self.required_integrations = kwargs.pop(PARAM_REQUIRED_INTEGRATIONS, ())
    self.requirements_file = kwargs.pop(PARAM_REQUIREMENTS_FILE, None)
    if self.requirements_file:
        logger.warning(
            "The `requirements_file` argument has been deprecated. Please use `requirements` instead to pass in either a string path to a file listing your 'requirements' or a list of the individual requirements."
        )
    self._requirements = kwargs.pop(PARAM_REQUIREMENTS, None)
    self.dockerignore_file = kwargs.pop(PARAM_DOCKERIGNORE_FILE, None)
    self.secrets = kwargs.pop(PARAM_SECRETS, [])

    self.name = self.__class__.__name__
    self.__steps: Dict[str, BaseStep] = {}
    self._verify_arguments(*args, **kwargs)
connect(self, *args, **kwargs)

Function that connects inputs and outputs of the pipeline steps.

Parameters:

Name Type Description Default
*args BaseStep

The positional arguments passed to the pipeline.

()
**kwargs BaseStep

The keyword arguments passed to the pipeline.

{}

Exceptions:

Type Description
NotImplementedError

Always.

Source code in zenml/pipelines/base_pipeline.py
@abstractmethod
def connect(self, *args: BaseStep, **kwargs: BaseStep) -> None:
    """Function that connects inputs and outputs of the pipeline steps.

    Args:
        *args: The positional arguments passed to the pipeline.
        **kwargs: The keyword arguments passed to the pipeline.

    Raises:
        NotImplementedError: Always.
    """
    raise NotImplementedError
run(self, *, run_name=None, schedule=None, **additional_parameters)

Runs the pipeline on the active stack of the current repository.

Parameters:

Name Type Description Default
run_name Optional[str]

Name of the pipeline run.

None
schedule Optional[zenml.pipelines.schedule.Schedule]

Optional schedule of the pipeline.

None
additional_parameters Any

Additional parameters to pass to the pipeline.

{}

Returns:

Type Description
Any

The result of the pipeline.

Source code in zenml/pipelines/base_pipeline.py
def run(
    self,
    *,
    run_name: Optional[str] = None,
    schedule: Optional[Schedule] = None,
    **additional_parameters: Any,
) -> Any:
    """Runs the pipeline on the active stack of the current repository.

    Args:
        run_name: Name of the pipeline run.
        schedule: Optional schedule of the pipeline.
        additional_parameters: Additional parameters to pass to the
            pipeline.

    Returns:
        The result of the pipeline.
    """
    if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
        # An environment variable was set to stop the execution of
        # pipelines. This is done to prevent execution of module-level
        # pipeline.run() calls inside docker containers which should only
        # run a single step.
        logger.info(
            "Preventing execution of pipeline '%s'. If this is not "
            "intended behavior, make sure to unset the environment "
            "variable '%s'.",
            self.name,
            constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
        )
        return

    logger.info("Creating run for pipeline: `%s`", self.name)
    logger.info(
        f'Cache {"enabled" if self.enable_cache else "disabled"} for '
        f"pipeline `{self.name}`"
    )

    # Activating the built-in integrations through lazy loading
    from zenml.integrations.registry import integration_registry

    integration_registry.activate_integrations()

    # Path of the file where pipeline.run() was called. This is needed by
    # the airflow orchestrator so it knows which file to copy into the DAG
    # directory
    dag_filepath = io_utils.resolve_relative_path(
        inspect.currentframe().f_back.f_code.co_filename  # type: ignore[union-attr]
    )
    runtime_configuration = RuntimeConfiguration(
        run_name=run_name,
        dag_filepath=dag_filepath,
        schedule=schedule,
        **additional_parameters,
    )
    stack = Repository().active_stack

    stack_metadata = {
        component_type.value: component.FLAVOR
        for component_type, component in stack.components.items()
    }
    track_event(
        event=AnalyticsEvent.RUN_PIPELINE,
        metadata={
            "store_type": Repository().active_profile.store_type.value,
            **stack_metadata,
            "total_steps": len(self.steps),
            "schedule": bool(schedule),
        },
    )

    self._reset_step_flags()
    self.validate_stack(stack)

    return stack.deploy_pipeline(
        self, runtime_configuration=runtime_configuration
    )
validate_stack(self, stack)

Validates if a stack is able to run this pipeline.

Parameters:

Name Type Description Default
stack Stack

The stack to validate.

required

Exceptions:

Type Description
StackValidationError

If the step operator is not configured in the active stack.

Source code in zenml/pipelines/base_pipeline.py
def validate_stack(self, stack: "Stack") -> None:
    """Validates if a stack is able to run this pipeline.

    Args:
        stack: The stack to validate.

    Raises:
        StackValidationError: If the step operator is not configured in the
            active stack.
    """
    available_step_operators = (
        {stack.step_operator.name} if stack.step_operator else set()
    )

    for step in self.steps.values():
        if (
            step.custom_step_operator
            and step.custom_step_operator not in available_step_operators
        ):
            raise StackValidationError(
                f"Step '{step.name}' requires custom step operator "
                f"'{step.custom_step_operator}' which is not configured in "
                f"the active stack. Available step operators: "
                f"{available_step_operators}."
            )
with_config(self, config_file, overwrite_step_parameters=False)

Configures this pipeline using a yaml file.

Parameters:

Name Type Description Default
config_file str

Path to a yaml file which contains configuration options for running this pipeline. See https://docs.zenml.io/features/pipeline-configuration#setting -step-parameters-using-a-config-file for details regarding the specification of this file.

required
overwrite_step_parameters bool

If set to True, values from the configuration file will overwrite configuration parameters passed in code.

False

Returns:

Type Description
~T

The pipeline object that this method was called on.

Source code in zenml/pipelines/base_pipeline.py
def with_config(
    self: T, config_file: str, overwrite_step_parameters: bool = False
) -> T:
    """Configures this pipeline using a yaml file.

    Args:
        config_file: Path to a yaml file which contains configuration
            options for running this pipeline. See
            https://docs.zenml.io/features/pipeline-configuration#setting
            -step-parameters-using-a-config-file
            for details regarding the specification of this file.
        overwrite_step_parameters: If set to `True`, values from the
            configuration file will overwrite configuration parameters
            passed in code.

    Returns:
        The pipeline object that this method was called on.
    """
    config_yaml = yaml_utils.read_yaml(config_file)

    if PipelineConfigurationKeys.STEPS in config_yaml:
        self._read_config_steps(
            config_yaml[PipelineConfigurationKeys.STEPS],
            overwrite=overwrite_step_parameters,
        )

    return self

BasePipelineMeta (type)

Pipeline Metaclass responsible for validating the pipeline definition.

Source code in zenml/pipelines/base_pipeline.py
class BasePipelineMeta(type):
    """Pipeline Metaclass responsible for validating the pipeline definition."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BasePipelineMeta":
        """Saves argument names for later verification purposes.

        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The dictionary of the class.

        Returns:
            The class.
        """
        cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))

        cls.STEP_SPEC = {}

        connect_spec = inspect.getfullargspec(
            inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
        )
        connect_args = connect_spec.args

        if connect_args and connect_args[0] == "self":
            connect_args.pop(0)

        for arg in connect_args:
            arg_type = connect_spec.annotations.get(arg, None)
            cls.STEP_SPEC.update({arg: arg_type})
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Saves argument names for later verification purposes.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases Tuple[Type[Any], ...]

The base classes of the class.

required
dct Dict[str, Any]

The dictionary of the class.

required

Returns:

Type Description
BasePipelineMeta

The class.

Source code in zenml/pipelines/base_pipeline.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BasePipelineMeta":
    """Saves argument names for later verification purposes.

    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The dictionary of the class.

    Returns:
        The class.
    """
    cls = cast(Type["BasePipeline"], super().__new__(mcs, name, bases, dct))

    cls.STEP_SPEC = {}

    connect_spec = inspect.getfullargspec(
        inspect.unwrap(getattr(cls, PIPELINE_INNER_FUNC_NAME))
    )
    connect_args = connect_spec.args

    if connect_args and connect_args[0] == "self":
        connect_args.pop(0)

    for arg in connect_args:
        arg_type = connect_spec.annotations.get(arg, None)
        cls.STEP_SPEC.update({arg: arg_type})
    return cls

builtin_pipelines special

Initialization for ZenML builtin pipelines.

training_pipeline

Class for built-in ZenML training pipeline.

TrainingPipeline (BasePipeline)

Class for the classic training pipeline implementation.

Source code in zenml/pipelines/builtin_pipelines/training_pipeline.py
class TrainingPipeline(BasePipeline):
    """Class for the classic training pipeline implementation."""

    def connect(  # type: ignore[override]
        self,
        datasource: step_interfaces.BaseDatasourceStep,
        splitter: step_interfaces.BaseSplitStep,
        analyzer: step_interfaces.BaseAnalyzerStep,
        preprocessor: step_interfaces.BasePreprocessorStep,
        trainer: step_interfaces.BaseTrainerStep,
        evaluator: step_interfaces.BaseEvaluatorStep,
    ) -> None:
        """Main connect method for the standard training pipelines.

        Args:
            datasource: the step responsible for the data ingestion
            splitter: the step responsible for splitting the dataset into
                train, test, val
            analyzer: the step responsible for extracting the statistics and
                the schema
            preprocessor: the step responsible for preprocessing the data
            trainer: the step responsible for training a model
            evaluator: the step responsible for computing the evaluation of
                the trained model
        """
        # Ingesting the datasource
        dataset = datasource()

        # Splitting the data
        train, test, validation = splitter(dataset=dataset)  # type:ignore

        # Analyzing the train dataset
        statistics, schema = analyzer(dataset=train)  # type:ignore

        # Preprocessing the splits
        train_t, test_t, validation_t = preprocessor(  # type:ignore
            train_dataset=train,
            test_dataset=test,
            validation_dataset=validation,
            statistics=statistics,
            schema=schema,
        )

        # Training the model
        model = trainer(train_dataset=train_t, validation_dataset=validation_t)

        # Evaluating the trained model
        evaluator(model=model, dataset=test_t)  # type:ignore
connect(self, datasource, splitter, analyzer, preprocessor, trainer, evaluator)

Main connect method for the standard training pipelines.

Parameters:

Name Type Description Default
datasource BaseDatasourceStep

the step responsible for the data ingestion

required
splitter BaseSplitStep

the step responsible for splitting the dataset into train, test, val

required
analyzer BaseAnalyzerStep

the step responsible for extracting the statistics and the schema

required
preprocessor BasePreprocessorStep

the step responsible for preprocessing the data

required
trainer BaseTrainerStep

the step responsible for training a model

required
evaluator BaseEvaluatorStep

the step responsible for computing the evaluation of the trained model

required
Source code in zenml/pipelines/builtin_pipelines/training_pipeline.py
def connect(  # type: ignore[override]
    self,
    datasource: step_interfaces.BaseDatasourceStep,
    splitter: step_interfaces.BaseSplitStep,
    analyzer: step_interfaces.BaseAnalyzerStep,
    preprocessor: step_interfaces.BasePreprocessorStep,
    trainer: step_interfaces.BaseTrainerStep,
    evaluator: step_interfaces.BaseEvaluatorStep,
) -> None:
    """Main connect method for the standard training pipelines.

    Args:
        datasource: the step responsible for the data ingestion
        splitter: the step responsible for splitting the dataset into
            train, test, val
        analyzer: the step responsible for extracting the statistics and
            the schema
        preprocessor: the step responsible for preprocessing the data
        trainer: the step responsible for training a model
        evaluator: the step responsible for computing the evaluation of
            the trained model
    """
    # Ingesting the datasource
    dataset = datasource()

    # Splitting the data
    train, test, validation = splitter(dataset=dataset)  # type:ignore

    # Analyzing the train dataset
    statistics, schema = analyzer(dataset=train)  # type:ignore

    # Preprocessing the splits
    train_t, test_t, validation_t = preprocessor(  # type:ignore
        train_dataset=train,
        test_dataset=test,
        validation_dataset=validation,
        statistics=statistics,
        schema=schema,
    )

    # Training the model
    model = trainer(train_dataset=train_t, validation_dataset=validation_t)

    # Evaluating the trained model
    evaluator(model=model, dataset=test_t)  # type:ignore

pipeline_decorator

Decorator function for ZenML pipelines.

pipeline(_func=None, *, name=None, enable_cache=True, required_integrations=(), requirements_file=None, requirements=None, dockerignore_file=None, secrets=[])

Outer decorator function for the creation of a ZenML pipeline.

In order to be able to work with parameters such as "name", it features a nested decorator structure.

Parameters:

Name Type Description Default
_func Optional[~F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache bool

Whether to use caching or not.

True
required_integrations Sequence[str]

Optional list of ZenML integrations that are required to run this pipeline. Run zenml integration list for a full list of available integrations.

()
requirements_file Optional[str]

DEPRECATED: Optional path to a pip requirements file that contains requirements to run the pipeline. Please use 'requirements' instead.

None
requirements Union[str, List[str]]

Optional path to a requirements file or a list of requirements.

None
dockerignore_file Optional[str]

Optional path to a dockerignore file to use when building docker images for running this pipeline. Note: If you pass a file, make sure it does not include the .zen directory as it is needed to run ZenML inside the container.

None
secrets Optional[List[str]]

Optional list of secrets that are required to run this pipeline.

[]

Returns:

Type Description
Union[Type[zenml.pipelines.base_pipeline.BasePipeline], Callable[[~F], Type[zenml.pipelines.base_pipeline.BasePipeline]]]

the inner decorator which creates the pipeline class based on the ZenML BasePipeline

Source code in zenml/pipelines/pipeline_decorator.py
def pipeline(
    _func: Optional[F] = None,
    *,
    name: Optional[str] = None,
    enable_cache: bool = True,
    required_integrations: Sequence[str] = (),
    requirements_file: Optional[str] = None,
    requirements: Optional[Union[str, List[str]]] = None,
    dockerignore_file: Optional[str] = None,
    secrets: Optional[List[str]] = [],
) -> Union[Type[BasePipeline], Callable[[F], Type[BasePipeline]]]:
    """Outer decorator function for the creation of a ZenML pipeline.

    In order to be able to work with parameters such as "name", it features a
    nested decorator structure.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        required_integrations: Optional list of ZenML integrations that are
            required to run this pipeline. Run `zenml integration list` for
            a full list of available integrations.
        requirements_file: DEPRECATED: Optional path to a pip requirements file
            that contains requirements to run the pipeline. Please use
            'requirements' instead.
        requirements: Optional path to a requirements file or a list of requirements.
        dockerignore_file: Optional path to a dockerignore file to use when
            building docker images for running this pipeline.
            **Note**: If you pass a file, make sure it does not include the
            `.zen` directory as it is needed to run ZenML inside the container.
        secrets: Optional list of secrets that are required to run this pipeline.

    Returns:
        the inner decorator which creates the pipeline class based on the
        ZenML BasePipeline
    """

    def inner_decorator(func: F) -> Type[BasePipeline]:
        """Inner decorator function for the creation of a ZenML pipeline.

        Args:
            func: types.FunctionType, this function will be used as the
                "connect" method of the generated Pipeline

        Returns:
            the class of a newly generated ZenML Pipeline
        """
        return type(  # noqa
            name if name else func.__name__,
            (BasePipeline,),
            {
                PIPELINE_INNER_FUNC_NAME: staticmethod(func),  # type: ignore[arg-type] # noqa
                INSTANCE_CONFIGURATION: {
                    PARAM_ENABLE_CACHE: enable_cache,
                    PARAM_REQUIRED_INTEGRATIONS: required_integrations,
                    PARAM_REQUIREMENTS_FILE: requirements_file,
                    PARAM_REQUIREMENTS: requirements,
                    PARAM_DOCKERIGNORE_FILE: dockerignore_file,
                    PARAM_SECRETS: secrets,
                },
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)

schedule

Class for defining a pipeline schedule.

Schedule (BaseModel) pydantic-model

Class for defining a pipeline schedule.

Attributes:

Name Type Description
cron_expression Optional[str]

Cron expression for the pipeline schedule. If a value for this is set it takes precedence over the start time + interval.

start_time Optional[datetime.datetime]

datetime object to indicate when to start the schedule.

end_time Optional[datetime.datetime]

datetime object to indicate when to end the schedule.

interval_second Optional[datetime.timedelta]

datetime timedelta indicating the seconds between two recurring runs for a periodic schedule.

catchup bool

Whether the recurring run should catch up if behind schedule. For example, if the recurring run is paused for a while and re-enabled afterwards. If catchup=True, the scheduler will catch up on (backfill) each missed interval. Otherwise, it only schedules the latest interval if more than one interval is ready to be scheduled. Usually, if your pipeline handles backfill internally, you should turn catchup off to avoid duplicate backfill.

Source code in zenml/pipelines/schedule.py
class Schedule(BaseModel):
    """Class for defining a pipeline schedule.

    Attributes:
        cron_expression: Cron expression for the pipeline schedule. If a value
            for this is set it takes precedence over the start time + interval.
        start_time: datetime object to indicate when to start the schedule.
        end_time: datetime object to indicate when to end the schedule.
        interval_second: datetime timedelta indicating the seconds between two
            recurring runs for a periodic schedule.
        catchup: Whether the recurring run should catch up if behind schedule.
            For example, if the recurring run is paused for a while and
            re-enabled afterwards. If catchup=True, the scheduler will catch
            up on (backfill) each missed interval. Otherwise, it only
            schedules the latest interval if more than one interval is ready to
            be scheduled. Usually, if your pipeline handles backfill
            internally, you should turn catchup off to avoid duplicate backfill.
    """

    cron_expression: Optional[str] = None
    start_time: Optional[datetime.datetime] = None
    end_time: Optional[datetime.datetime] = None
    interval_second: Optional[datetime.timedelta] = None
    catchup: bool = False

    @root_validator
    def _ensure_cron_or_periodic_schedule_configured(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Ensures that the cron expression or start time + interval are set.

        Args:
            values: All attributes of the schedule.

        Returns:
            All schedule attributes.

        Raises:
            ValueError: If no cron expression or start time + interval were
                provided.
        """
        cron_expression = values.get("cron_expression")
        periodic_schedule = values.get("start_time") and values.get(
            "interval_second"
        )

        if cron_expression and periodic_schedule:
            logger.warning(
                "This schedule was created with a cron expression as well as "
                "values for `start_time` and `interval_seconds`. The resulting "
                "behaviour depends on the concrete orchestrator implementation "
                "but will usually ignore the interval and use the cron "
                "expression."
            )
            return values
        elif cron_expression or periodic_schedule:
            return values
        else:
            raise ValueError(
                "Either a cron expression or start time and interval seconds "
                "need to be set for a valid schedule."
            )

    @property
    def utc_start_time(self) -> Optional[str]:
        """Optional ISO-formatted string of the UTC start time.

        Returns:
            Optional ISO-formatted string of the UTC start time.
        """
        if not self.start_time:
            return None

        return self.start_time.astimezone(datetime.timezone.utc).isoformat()

    @property
    def utc_end_time(self) -> Optional[str]:
        """Optional ISO-formatted string of the UTC end time.

        Returns:
            Optional ISO-formatted string of the UTC end time.
        """
        if not self.end_time:
            return None

        return self.end_time.astimezone(datetime.timezone.utc).isoformat()
utc_end_time: Optional[str] property readonly

Optional ISO-formatted string of the UTC end time.

Returns:

Type Description
Optional[str]

Optional ISO-formatted string of the UTC end time.

utc_start_time: Optional[str] property readonly

Optional ISO-formatted string of the UTC start time.

Returns:

Type Description
Optional[str]

Optional ISO-formatted string of the UTC start time.