Skip to content

New

zenml.new special

pipelines special

build_utils

Pipeline build utilities.

compute_build_checksum(items, stack, code_repository=None)

Compute an overall checksum for a pipeline build.

Parameters:

Name Type Description Default
items List[BuildConfiguration]

Items of the build.

required
stack Stack

The stack associated with the build. Will be used to gather its requirements.

required
code_repository Optional[BaseCodeRepository]

The code repository that will be used to download files inside the build. Will be used for its dependency specification.

None

Returns:

Type Description
str

The build checksum.

Source code in zenml/new/pipelines/build_utils.py
def compute_build_checksum(
    items: List["BuildConfiguration"],
    stack: "Stack",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> str:
    """Compute an overall checksum for a pipeline build.

    Args:
        items: Items of the build.
        stack: The stack associated with the build. Will be used to gather
            its requirements.
        code_repository: The code repository that will be used to download
            files inside the build. Will be used for its dependency
            specification.

    Returns:
        The build checksum.
    """
    hash_ = hashlib.md5()

    for item in items:
        key = PipelineBuildBaseModel.get_image_key(
            component_key=item.key, step=item.step_name
        )

        settings_checksum = item.compute_settings_checksum(
            stack=stack,
            code_repository=code_repository,
        )

        hash_.update(key.encode())
        hash_.update(settings_checksum.encode())

    return hash_.hexdigest()
create_pipeline_build(deployment, pipeline_id=None, code_repository=None)

Builds images and registers the output in the server.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBaseModel

The pipeline deployment.

required
pipeline_id Optional[uuid.UUID]

The ID of the pipeline.

None
code_repository Optional[BaseCodeRepository]

If provided, this code repository will be used to download inside the build images.

None

Returns:

Type Description
Optional[PipelineBuildResponseModel]

The build output.

Exceptions:

Type Description
RuntimeError

If multiple builds with the same key but different settings were specified.

Source code in zenml/new/pipelines/build_utils.py
def create_pipeline_build(
    deployment: "PipelineDeploymentBaseModel",
    pipeline_id: Optional[UUID] = None,
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponseModel"]:
    """Builds images and registers the output in the server.

    Args:
        deployment: The pipeline deployment.
        pipeline_id: The ID of the pipeline.
        code_repository: If provided, this code repository will be used to
            download inside the build images.

    Returns:
        The build output.

    Raises:
        RuntimeError: If multiple builds with the same key but different
            settings were specified.
    """
    client = Client()
    stack = client.active_stack
    required_builds = stack.get_docker_builds(deployment=deployment)

    if not required_builds:
        logger.debug("No docker builds required.")
        return None

    logger.info(
        "Building Docker image(s) for pipeline `%s`.",
        deployment.pipeline_configuration.name,
    )

    docker_image_builder = PipelineDockerImageBuilder()
    images: Dict[str, BuildItem] = {}
    checksums: Dict[str, str] = {}

    for build_config in required_builds:
        combined_key = PipelineBuildBaseModel.get_image_key(
            component_key=build_config.key, step=build_config.step_name
        )
        checksum = build_config.compute_settings_checksum(
            stack=stack, code_repository=code_repository
        )

        if combined_key in images:
            previous_checksum = images[combined_key].settings_checksum

            if previous_checksum != checksum:
                raise RuntimeError(
                    f"Trying to build image for key `{combined_key}` but "
                    "an image for this key was already built with a "
                    "different configuration. This happens if multiple "
                    "stack components specified Docker builds for the same "
                    "key in the `StackComponent.get_docker_builds(...)` "
                    "method. If you're using custom components, make sure "
                    "to provide unique keys when returning your build "
                    "configurations to avoid this error."
                )
            else:
                continue

        if checksum in checksums:
            item_key = checksums[checksum]
            image_name_or_digest = images[item_key].image
            contains_code = images[item_key].contains_code
            dockerfile = images[item_key].dockerfile
            requirements = images[item_key].requirements
        else:
            tag = deployment.pipeline_configuration.name
            if build_config.step_name:
                tag += f"-{build_config.step_name}"
            tag += f"-{build_config.key}"

            include_files = build_config.should_include_files(
                code_repository=code_repository,
            )
            download_files = build_config.should_download_files(
                code_repository=code_repository,
            )

            (
                image_name_or_digest,
                dockerfile,
                requirements,
            ) = docker_image_builder.build_docker_image(
                docker_settings=build_config.settings,
                tag=tag,
                stack=stack,
                include_files=include_files,
                download_files=download_files,
                entrypoint=build_config.entrypoint,
                extra_files=build_config.extra_files,
                code_repository=code_repository,
            )
            contains_code = include_files

        images[combined_key] = BuildItem(
            image=image_name_or_digest,
            dockerfile=dockerfile,
            requirements=requirements,
            settings_checksum=checksum,
            contains_code=contains_code,
            requires_code_download=download_files,
        )
        checksums[checksum] = combined_key

    logger.info("Finished building Docker image(s).")

    is_local = stack.container_registry is None
    contains_code = any(item.contains_code for item in images.values())
    build_checksum = compute_build_checksum(
        required_builds, stack=stack, code_repository=code_repository
    )

    build_request = PipelineBuildRequestModel(
        user=client.active_user.id,
        workspace=client.active_workspace.id,
        stack=client.active_stack_model.id,
        pipeline=pipeline_id,
        is_local=is_local,
        contains_code=contains_code,
        images=images,
        zenml_version=zenml.__version__,
        python_version=platform.python_version(),
        checksum=build_checksum,
    )
    return client.zen_store.create_build(build_request)
find_existing_build(deployment, code_repository)

Find an existing build for a deployment.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBaseModel

The deployment for which to find an existing build.

required
code_repository BaseCodeRepository

The code repository that will be used to download files in the images.

required

Returns:

Type Description
Optional[PipelineBuildResponseModel]

The existing build to reuse if found.

Source code in zenml/new/pipelines/build_utils.py
def find_existing_build(
    deployment: "PipelineDeploymentBaseModel",
    code_repository: "BaseCodeRepository",
) -> Optional["PipelineBuildResponseModel"]:
    """Find an existing build for a deployment.

    Args:
        deployment: The deployment for which to find an existing build.
        code_repository: The code repository that will be used to download
            files in the images.

    Returns:
        The existing build to reuse if found.
    """
    client = Client()
    stack = client.active_stack

    python_version_prefix = ".".join(platform.python_version_tuple()[:2])
    required_builds = stack.get_docker_builds(deployment=deployment)

    if not required_builds:
        return None

    build_checksum = compute_build_checksum(
        required_builds, stack=stack, code_repository=code_repository
    )

    matches = client.list_builds(
        sort_by="desc:created",
        size=1,
        stack_id=stack.id,
        # The build is local and it's not clear whether the images
        # exist on the current machine or if they've been overwritten.
        # TODO: Should we support this by storing the unique Docker ID for
        # the image and checking if an image with that ID exists locally?
        is_local=False,
        # The build contains some code which might be different than the
        # local code the user is expecting to run
        contains_code=False,
        zenml_version=zenml.__version__,
        # Match all patch versions of the same Python major + minor
        python_version=f"startswith:{python_version_prefix}",
        checksum=build_checksum,
    )

    if not matches.items:
        return None

    return matches[0]
reuse_or_create_pipeline_build(deployment, allow_build_reuse, pipeline_id=None, build=None, code_repository=None)

Loads or creates a pipeline build.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBaseModel

The pipeline deployment for which to load or create the build.

required
allow_build_reuse bool

If True, the build is allowed to reuse an existing build.

required
pipeline_id Optional[uuid.UUID]

Optional ID of the pipeline to reference in the build.

None
build Union[UUID, PipelineBuildBaseModel]

Optional existing build. If given, the build will be fetched (or registered) in the database. If not given, a new build will be created.

None
code_repository Optional[BaseCodeRepository]

If provided, this code repository will be used to download inside the build images.

None

Returns:

Type Description
Optional[PipelineBuildResponseModel]

The build response.

Source code in zenml/new/pipelines/build_utils.py
def reuse_or_create_pipeline_build(
    deployment: "PipelineDeploymentBaseModel",
    allow_build_reuse: bool,
    pipeline_id: Optional[UUID] = None,
    build: Union["UUID", "PipelineBuildBaseModel", None] = None,
    code_repository: Optional["BaseCodeRepository"] = None,
) -> Optional["PipelineBuildResponseModel"]:
    """Loads or creates a pipeline build.

    Args:
        deployment: The pipeline deployment for which to load or create the
            build.
        allow_build_reuse: If True, the build is allowed to reuse an
            existing build.
        pipeline_id: Optional ID of the pipeline to reference in the build.
        build: Optional existing build. If given, the build will be fetched
            (or registered) in the database. If not given, a new build will
            be created.
        code_repository: If provided, this code repository will be used to
            download inside the build images.

    Returns:
        The build response.
    """
    if not build:
        if (
            allow_build_reuse
            and code_repository
            and not deployment.requires_included_files
        ):
            existing_build = find_existing_build(
                deployment=deployment, code_repository=code_repository
            )

            if existing_build:
                logger.info(
                    "Reusing existing build `%s` for stack `%s`.",
                    existing_build.id,
                    Client().active_stack.name,
                )
                return existing_build

        return create_pipeline_build(
            deployment=deployment,
            pipeline_id=pipeline_id,
            code_repository=code_repository,
        )

    build_model = None

    if isinstance(build, UUID):
        build_model = Client().zen_store.get_build(build_id=build)
    else:
        build_request = PipelineBuildRequestModel(
            user=Client().active_user.id,
            workspace=Client().active_workspace.id,
            stack=Client().active_stack_model.id,
            pipeline=pipeline_id,
            **build.dict(),
        )
        build_model = Client().zen_store.create_build(build=build_request)

    verify_custom_build(
        build=build_model,
        deployment=deployment,
        code_repository=code_repository,
    )

    return build_model
verify_custom_build(build, deployment, code_repository=None)

Verify a custom build for a pipeline deployment.

Parameters:

Name Type Description Default
build PipelineBuildResponseModel

The build to verify.

required
deployment PipelineDeploymentBaseModel

The deployment for which to verify the build.

required
code_repository Optional[BaseCodeRepository]

Code repository that will be used to download files for the deployment.

None

Exceptions:

Type Description
RuntimeError

If the build can't be used for the deployment.

Source code in zenml/new/pipelines/build_utils.py
def verify_custom_build(
    build: "PipelineBuildResponseModel",
    deployment: "PipelineDeploymentBaseModel",
    code_repository: Optional["BaseCodeRepository"] = None,
) -> None:
    """Verify a custom build for a pipeline deployment.

    Args:
        build: The build to verify.
        deployment: The deployment for which to verify the build.
        code_repository: Code repository that will be used to download files
            for the deployment.

    Raises:
        RuntimeError: If the build can't be used for the deployment.
    """
    stack = Client().active_stack
    required_builds = stack.get_docker_builds(deployment=deployment)

    if build.stack and build.stack.id != stack.id:
        logger.warning(
            "The stack `%s` used for the build `%s` is not the same as the "
            "stack `%s` that the pipeline will run on. This could lead "
            "to issues if the stacks have different build requirements.",
            build.stack.name,
            build.id,
            stack.name,
        )

    if build.contains_code:
        logger.warning(
            "The build you specified for this run contains code and will run "
            "with the step code that was included in the Docker images which "
            "might differ from the local code in your client environment."
        )

    if build.requires_code_download and not code_repository:
        raise RuntimeError(
            "The build you specified does not include code but code download "
            "not possible. This might be because you don't have a code "
            "repository registered or the code repository contains local "
            "changes."
        )

    if build.checksum:
        build_checksum = compute_build_checksum(
            required_builds, stack=stack, code_repository=code_repository
        )
        if build_checksum != build.checksum:
            logger.warning(
                "The Docker settings used for the build `%s` are "
                "not the same as currently specified for your pipeline. "
                "This means that the build you specified to run this "
                "pipeline might be outdated and most likely contains "
                "outdated requirements.",
                build.id,
            )
    else:
        # No checksum given for the entire build, we manually check that
        # all the images exist and the setting match
        for build_config in required_builds:
            try:
                image = build.get_image(
                    component_key=build_config.key,
                    step=build_config.step_name,
                )
            except KeyError:
                raise RuntimeError(
                    "The build you specified is missing an image for key: "
                    f"{build_config.key}."
                )

            if build_config.compute_settings_checksum(
                stack=stack, code_repository=code_repository
            ) != build.get_settings_checksum(
                component_key=build_config.key, step=build_config.step_name
            ):
                logger.warning(
                    "The Docker settings used to build the image `%s` are "
                    "not the same as currently specified for your pipeline. "
                    "This means that the build you specified to run this "
                    "pipeline might be outdated and most likely contains "
                    "outdated code or requirements.",
                    image,
                )

    if build.is_local:
        logger.warning(
            "You manually specified a local build to run your pipeline. "
            "This might lead to errors if the images don't exist on "
            "your local machine or the image tags have been "
            "overwritten since the original build happened."
        )
verify_local_repository_context(deployment, local_repo_context)

Verifies the local repository.

If the local repository exists and has no local changes, code download inside the images is possible.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBaseModel

The pipeline deployment.

required
local_repo_context Optional[LocalRepositoryContext]

The local repository active at the source root.

required

Exceptions:

Type Description
RuntimeError

If the deployment requires code download but code download is not possible.

Returns:

Type Description
Optional[zenml.code_repositories.base_code_repository.BaseCodeRepository]

The code repository from which to download files for the runs of the deployment, or None if code download is not possible.

Source code in zenml/new/pipelines/build_utils.py
def verify_local_repository_context(
    deployment: "PipelineDeploymentBaseModel",
    local_repo_context: Optional["LocalRepositoryContext"],
) -> Optional[BaseCodeRepository]:
    """Verifies the local repository.

    If the local repository exists and has no local changes, code download
    inside the images is possible.

    Args:
        deployment: The pipeline deployment.
        local_repo_context: The local repository active at the source root.

    Raises:
        RuntimeError: If the deployment requires code download but code download
            is not possible.

    Returns:
        The code repository from which to download files for the runs of the
        deployment, or None if code download is not possible.
    """
    if deployment.requires_code_download:
        if not local_repo_context:
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code should be included in the "
                "Docker image (`source_files='download'`), but there is no "
                "code repository active at your current source root "
                f"`{source_utils.get_source_root()}`."
            )
        elif local_repo_context.is_dirty:
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code should be included in the "
                "Docker image (`source_files='download'`), but the code "
                "repository active at your current source root "
                f"`{source_utils.get_source_root()}` has uncommitted "
                "changes."
            )
        elif local_repo_context.has_local_changes:
            raise RuntimeError(
                "The `DockerSettings` of the pipeline or one of its "
                "steps specify that code should be included in the "
                "Docker image (`source_files='download'`), but the code "
                "repository active at your current source root "
                f"`{source_utils.get_source_root()}` has unpushed "
                "changes."
            )

    code_repository = None

    if local_repo_context and not local_repo_context.has_local_changes:
        model = Client().get_code_repository(
            local_repo_context.code_repository_id
        )
        code_repository = BaseCodeRepository.from_model(model)

    return code_repository

deserialization_utils

Pipeline deserialization utilities.

load_pipeline(model)

Load a pipeline from a model.

Parameters:

Name Type Description Default
model PipelineResponseModel

The pipeline model to load.

required

Exceptions:

Type Description
ValueError

If the pipeline can't be loaded due to an old model spec (version <0.2).

Returns:

Type Description
Pipeline

The loaded pipeline.

Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline(model: "PipelineResponseModel") -> "Pipeline":
    """Load a pipeline from a model.

    Args:
        model: The pipeline model to load.

    Raises:
        ValueError: If the pipeline can't be loaded due to an old model spec
            (version <0.2).

    Returns:
        The loaded pipeline.
    """
    model_version = version.parse(model.spec.version)
    if model_version < version.parse("0.2"):
        raise ValueError(
            "Loading a pipeline is only possible for pipeline specs with "
            "version 0.2 or higher."
        )
    elif model_version == version.parse("0.2"):
        pipeline_instance = load_pipeline_v_0_2(model=model)
    elif model_version == version.parse("0.3"):
        pipeline_instance = load_pipeline_v_0_3(model=model)
    else:
        pipeline_instance = load_pipeline_v_0_4(model=model)

    version_hash = pipeline_instance._compute_unique_identifier(
        pipeline_spec=model.spec
    )
    if version_hash != model.version_hash:
        logger.warning(
            "Trying to load pipeline version `%s`, but the local step code "
            "changed since this pipeline version was registered. Using "
            "this pipeline instance will result in a different pipeline "
            "version being registered or reused.",
            model.version,
        )
    return pipeline_instance
load_pipeline_v_0_2(model)

Load a pipeline from a model with spec version 0.2.

Parameters:

Name Type Description Default
model PipelineResponseModel

The pipeline model to load.

required

Returns:

Type Description
Pipeline

The loaded pipeline.

Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline_v_0_2(model: "PipelineResponseModel") -> "Pipeline":
    """Load a pipeline from a model with spec version 0.2.

    Args:
        model: The pipeline model to load.

    Returns:
        The loaded pipeline.
    """
    return _load_legacy_pipeline(
        model=model, use_pipeline_parameter_name=False
    )
load_pipeline_v_0_3(model)

Load a pipeline from a model with spec version 0.3.

Parameters:

Name Type Description Default
model PipelineResponseModel

The pipeline model to load.

required

Returns:

Type Description
Pipeline

The loaded pipeline.

Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline_v_0_3(model: "PipelineResponseModel") -> "Pipeline":
    """Load a pipeline from a model with spec version 0.3.

    Args:
        model: The pipeline model to load.

    Returns:
        The loaded pipeline.
    """
    return _load_legacy_pipeline(model=model, use_pipeline_parameter_name=True)
load_pipeline_v_0_4(model)

Load a pipeline from a model with spec version 0.4.

Parameters:

Name Type Description Default
model PipelineResponseModel

The pipeline model to load.

required

Exceptions:

Type Description
TypeError

If the pipeline source does not refer to a pipeline instance.

Returns:

Type Description
Pipeline

The loaded pipeline.

Source code in zenml/new/pipelines/deserialization_utils.py
def load_pipeline_v_0_4(model: "PipelineResponseModel") -> "Pipeline":
    """Load a pipeline from a model with spec version 0.4.

    Args:
        model: The pipeline model to load.

    Raises:
        TypeError: If the pipeline source does not refer to a pipeline instance.

    Returns:
        The loaded pipeline.
    """
    pipeline_source = model.spec.source
    assert pipeline_source

    pipeline = source_utils.load(pipeline_source)

    if not isinstance(pipeline, Pipeline):
        raise TypeError("Not a pipeline")

    pipeline.prepare(**model.spec.parameters)
    return pipeline

pipeline

Definition of a ZenML pipeline.

GetRunsDescriptor

Descriptor to define the BasePipeline.get_runs.

Descriptors (https://docs.python.org/3/reference/datamodel.html#implementing-descriptors) allow us to define different behaviors for pipeline classes and instances.

Source code in zenml/new/pipelines/pipeline.py
class GetRunsDescriptor:
    """Descriptor to define the `BasePipeline.get_runs`.

    Descriptors (https://docs.python.org/3/reference/datamodel.html#implementing-descriptors)
    allow us to define different behaviors for pipeline classes and instances.
    """

    def __get__(
        self, instance: Optional["Pipeline"], cls: Type["Pipeline"]
    ) -> Callable[[], List["PipelineRunView"]]:
        """Get all runs of this pipeline instance or class.

        Args:
            instance: The pipeline instance if called on an instance else None.
            cls: The pipeline class.

        Returns:
            A list of all runs of this pipeline instance or class.

        Raises:
            RuntimeError: If the method is called on a pipeline instance that
                has not been run yet.
        """
        from zenml.post_execution import get_pipeline

        pipeline_view: Union["PipelineVersionView", "PipelineView"]
        if instance is None:
            pipeline_view = get_pipeline(cls)
        else:
            instance._prepare_if_possible()
            pipeline_view = get_pipeline(instance)

        if pipeline_view:
            return lambda: pipeline_view.runs
        raise RuntimeError(
            "The pipeline view for this pipeline was not found. Please check "
            "that the pipeline has been run already."
        )
__get__(self, instance, cls) special

Get all runs of this pipeline instance or class.

Parameters:

Name Type Description Default
instance Optional[Pipeline]

The pipeline instance if called on an instance else None.

required
cls Type[Pipeline]

The pipeline class.

required

Returns:

Type Description
Callable[[], List[PipelineRunView]]

A list of all runs of this pipeline instance or class.

Exceptions:

Type Description
RuntimeError

If the method is called on a pipeline instance that has not been run yet.

Source code in zenml/new/pipelines/pipeline.py
def __get__(
    self, instance: Optional["Pipeline"], cls: Type["Pipeline"]
) -> Callable[[], List["PipelineRunView"]]:
    """Get all runs of this pipeline instance or class.

    Args:
        instance: The pipeline instance if called on an instance else None.
        cls: The pipeline class.

    Returns:
        A list of all runs of this pipeline instance or class.

    Raises:
        RuntimeError: If the method is called on a pipeline instance that
            has not been run yet.
    """
    from zenml.post_execution import get_pipeline

    pipeline_view: Union["PipelineVersionView", "PipelineView"]
    if instance is None:
        pipeline_view = get_pipeline(cls)
    else:
        instance._prepare_if_possible()
        pipeline_view = get_pipeline(instance)

    if pipeline_view:
        return lambda: pipeline_view.runs
    raise RuntimeError(
        "The pipeline view for this pipeline was not found. Please check "
        "that the pipeline has been run already."
    )
Pipeline

ZenML pipeline class.

Source code in zenml/new/pipelines/pipeline.py
class Pipeline:
    """ZenML pipeline class."""

    # The active pipeline is the pipeline to which step invocations will be
    # added when a step is called. It is set using a context manager when a
    # pipeline is called (see Pipeline.__call__ for more context)
    ACTIVE_PIPELINE: ClassVar[Optional["Pipeline"]] = None

    def __init__(
        self,
        name: str,
        entrypoint: F,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        extra: Optional[Dict[str, Any]] = None,
        on_failure: Optional["HookSpecification"] = None,
        on_success: Optional["HookSpecification"] = None,
    ) -> None:
        """Initializes a pipeline.

        Args:
            name: The name of the pipeline.
            entrypoint: The entrypoint function of the pipeline.
            enable_cache: If caching should be enabled for this pipeline.
            enable_artifact_metadata: If artifact metadata should be enabled for
                this pipeline.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline.
            settings: settings for this pipeline.
            extra: Extra configurations for this pipeline.
            on_failure: Callback function in event of failure of the step. Can
                be a function with three possible parameters, `StepContext`,
                `BaseParameters`, and `BaseException`, or a source path to a
                function of the same specifications (e.g. `module.my_function`).
            on_success: Callback function in event of failure of the step. Can
                be a function with two possible parameters, `StepContext` and
                `BaseParameters, or a source path to a function of the same
                specifications (e.g. `module.my_function`).
        """
        self._invocations: Dict[str, StepInvocation] = {}
        self._run_args: Dict[str, Any] = {}

        self._configuration = PipelineConfiguration(
            name=name,
        )
        self.configure(
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_artifact_visualization=enable_artifact_visualization,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
        )
        self.entrypoint = entrypoint
        self._parameters: Dict[str, Any] = {}

    @property
    def name(self) -> str:
        """The name of the pipeline.

        Returns:
            The name of the pipeline.
        """
        return self.configuration.name

    @property
    def enable_cache(self) -> Optional[bool]:
        """If caching is enabled for the pipeline.

        Returns:
            If caching is enabled for the pipeline.
        """
        return self.configuration.enable_cache

    @property
    def configuration(self) -> PipelineConfiguration:
        """The configuration of the pipeline.

        Returns:
            The configuration of the pipeline.
        """
        return self._configuration

    @property
    def invocations(self) -> Dict[str, StepInvocation]:
        """Returns the step invocations of this pipeline.

        This dictionary will only be populated once the pipeline has been
        called.

        Returns:
            The step invocations.
        """
        return self._invocations

    def resolve(self) -> "Source":
        """Resolves the pipeline.

        Returns:
            The pipeline source.
        """
        return source_utils.resolve(self.entrypoint, skip_validation=True)

    @property
    def source_object(self) -> Any:
        """The source object of this pipeline.

        Returns:
            The source object of this pipeline.
        """
        return self.entrypoint

    @property
    def source_code(self) -> str:
        """The source code of this pipeline.

        Returns:
            The source code of this pipeline.
        """
        return inspect.getsource(self.source_object)

    @classmethod
    def from_model(cls, model: "PipelineResponseModel") -> "Pipeline":
        """Creates a pipeline instance from a model.

        Args:
            model: The model to load the pipeline instance from.

        Returns:
            The pipeline instance.
        """
        from zenml.new.pipelines.deserialization_utils import load_pipeline

        return load_pipeline(model=model)

    def configure(
        self: T,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        extra: Optional[Dict[str, Any]] = None,
        on_failure: Optional["HookSpecification"] = None,
        on_success: Optional["HookSpecification"] = None,
        merge: bool = True,
    ) -> T:
        """Configures the pipeline.

        Configuration merging example:
        * `merge==True`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=True)
            pipeline.configuration.extra # {"key1": 1, "key2": 2}
        * `merge==False`:
            pipeline.configure(extra={"key1": 1})
            pipeline.configure(extra={"key2": 2}, merge=False)
            pipeline.configuration.extra # {"key2": 2}

        Args:
            enable_cache: If caching should be enabled for this pipeline.
            enable_artifact_metadata: If artifact metadata should be enabled for
                this pipeline.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline.
            settings: settings for this pipeline.
            extra: Extra configurations for this pipeline.
            on_failure: Callback function in event of failure of the step. Can
                be a function with three possible parameters, `StepContext`,
                `BaseParameters`, and `BaseException`, or a source path to a
                function of the same specifications (e.g. `module.my_function`).
            on_success: Callback function in event of failure of the step. Can
                be a function with two possible parameters, `StepContext` and
                `BaseParameters, or a source path to a function of the same
                specifications (e.g. `module.my_function`).
            merge: If `True`, will merge the given dictionary configurations
                like `extra` and `settings` with existing
                configurations. If `False` the given configurations will
                overwrite all existing ones. See the general description of this
                method for an example.

        Returns:
            The pipeline instance that this method was called on.
        """
        failure_hook_source = None
        if on_failure:
            # string of on_failure hook function to be used for this pipeline
            failure_hook_source = resolve_and_validate_hook(on_failure)

        success_hook_source = None
        if on_success:
            # string of on_success hook function to be used for this pipeline
            success_hook_source = resolve_and_validate_hook(on_success)

        values = dict_utils.remove_none_values(
            {
                "enable_cache": enable_cache,
                "enable_artifact_metadata": enable_artifact_metadata,
                "enable_artifact_visualization": enable_artifact_visualization,
                "settings": settings,
                "extra": extra,
                "failure_hook_source": failure_hook_source,
                "success_hook_source": success_hook_source,
            }
        )
        config = PipelineConfigurationUpdate(**values)
        self._apply_configuration(config, merge=merge)
        return self

    @property
    def requires_parameters(self) -> bool:
        """If the pipeline entrypoint requires parameters.

        Returns:
            If the pipeline entrypoint requires parameters.
        """
        signature = inspect.signature(self.entrypoint, follow_wrapped=True)
        return any(
            parameter.default is inspect.Parameter.empty
            for parameter in signature.parameters.values()
        )

    @property
    def is_prepared(self) -> bool:
        """If the pipeline is prepared.

        Prepared means that the pipeline entrypoint has been called and the
        pipeline is fully defined.

        Returns:
            If the pipeline is prepared.
        """
        return len(self.invocations) > 0

    def prepare(self, *args: Any, **kwargs: Any) -> None:
        """Prepares the pipeline.

        Args:
            *args: Pipeline entrypoint input arguments.
            **kwargs: Pipeline entrypoint input keyword arguments.
        """
        # Clear existing parameters and invocations
        self._parameters = {}
        self._invocations = {}

        with self:
            # Enter the context manager so we become the active pipeline. This
            # means that all steps that get called while the entrypoint function
            # is executed will be added as invocation to this pipeline instance.
            self._call_entrypoint(*args, **kwargs)

    def register(self) -> "PipelineResponseModel":
        """Register the pipeline in the server.

        Returns:
            The registered pipeline model.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry

        self._prepare_if_possible()
        integration_registry.activate_integrations()

        custom_configurations = self.configuration.dict(
            exclude_defaults=True, exclude={"name"}
        )
        if custom_configurations:
            logger.warning(
                f"The pipeline `{self.name}` that you're registering has "
                "custom configurations applied to it. These will not be "
                "registered with the pipeline and won't be set when you build "
                "images or run the pipeline from the CLI. To provide these "
                "configurations, use the `--config` option of the `zenml "
                "pipeline build/run` commands."
            )

        pipeline_spec = Compiler().compile_spec(self)
        return self._register(pipeline_spec=pipeline_spec)

    def build(
        self,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        config_path: Optional[str] = None,
    ) -> Optional["PipelineBuildResponseModel"]:
        """Builds Docker images for the pipeline.

        Args:
            settings: Settings for the pipeline.
            step_configurations: Configurations for steps of the pipeline.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.

        Returns:
            The build output.
        """
        with event_handler(event=AnalyticsEvent.BUILD_PIPELINE, v2=True):
            self._prepare_if_possible()
            deployment, pipeline_spec, _, _ = self._compile(
                config_path=config_path,
                steps=step_configurations,
                settings=settings,
            )
            pipeline_id = self._register(pipeline_spec=pipeline_spec).id

            local_repo = code_repository_utils.find_active_code_repository()
            code_repository = build_utils.verify_local_repository_context(
                deployment=deployment, local_repo_context=local_repo
            )

            return build_utils.create_pipeline_build(
                deployment=deployment,
                pipeline_id=pipeline_id,
                code_repository=code_repository,
            )

    def _run(
        self,
        *,
        run_name: Optional[str] = None,
        enable_cache: Optional[bool] = None,
        enable_artifact_metadata: Optional[bool] = None,
        enable_artifact_visualization: Optional[bool] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
        settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        extra: Optional[Dict[str, Any]] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
        prevent_build_reuse: bool = False,
    ) -> None:
        """Runs the pipeline on the active stack.

        Args:
            run_name: Name of the pipeline run.
            enable_cache: If caching should be enabled for this pipeline run.
            enable_artifact_metadata: If artifact metadata should be enabled
                for this pipeline run.
            enable_artifact_visualization: If artifact visualization should be
                enabled for this pipeline run.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            settings: Settings for this pipeline run.
            step_configurations: Configurations for steps of the pipeline.
            extra: Extra configurations for this pipeline run.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
            prevent_build_reuse: Whether to prevent the reuse of a build.
        """
        if constants.SHOULD_PREVENT_PIPELINE_EXECUTION:
            # An environment variable was set to stop the execution of
            # pipelines. This is done to prevent execution of module-level
            # pipeline.run() calls when importing modules needed to run a step.
            logger.info(
                "Preventing execution of pipeline '%s'. If this is not "
                "intended behavior, make sure to unset the environment "
                "variable '%s'.",
                self.name,
                constants.ENV_ZENML_PREVENT_PIPELINE_EXECUTION,
            )
            return

        with event_handler(
            event=AnalyticsEvent.RUN_PIPELINE, v2=True
        ) as analytics_handler:
            deployment, pipeline_spec, schedule, build = self._compile(
                config_path=config_path,
                run_name=run_name,
                enable_cache=enable_cache,
                enable_artifact_metadata=enable_artifact_metadata,
                enable_artifact_visualization=enable_artifact_visualization,
                steps=step_configurations,
                settings=settings,
                schedule=schedule,
                build=build,
                extra=extra,
            )

            skip_pipeline_registration = constants.handle_bool_env_var(
                constants.ENV_ZENML_SKIP_PIPELINE_REGISTRATION,
                default=False,
            )

            register_pipeline = not (skip_pipeline_registration or unlisted)

            pipeline_id = None
            if register_pipeline:
                pipeline_id = self._register(pipeline_spec=pipeline_spec).id

            # TODO: check whether orchestrator even support scheduling before
            # registering the schedule
            schedule_id = None
            if schedule:
                if schedule.name:
                    schedule_name = schedule.name
                else:
                    date = datetime.utcnow().strftime("%Y_%m_%d")
                    time = datetime.utcnow().strftime("%H_%M_%S_%f")
                    schedule_name = deployment.run_name_template.format(
                        date=date, time=time
                    )
                components = Client().active_stack_model.components
                orchestrator = components[StackComponentType.ORCHESTRATOR][0]
                schedule_model = ScheduleRequestModel(
                    workspace=Client().active_workspace.id,
                    user=Client().active_user.id,
                    pipeline_id=pipeline_id,
                    orchestrator_id=orchestrator.id,
                    name=schedule_name,
                    active=True,
                    cron_expression=schedule.cron_expression,
                    start_time=schedule.start_time,
                    end_time=schedule.end_time,
                    interval_second=schedule.interval_second,
                    catchup=schedule.catchup,
                )
                schedule_id = (
                    Client().zen_store.create_schedule(schedule_model).id
                )
                logger.info(
                    f"Created schedule `{schedule_name}` for pipeline "
                    f"`{deployment.pipeline_configuration.name}`."
                )

            stack = Client().active_stack

            local_repo_context = (
                code_repository_utils.find_active_code_repository()
            )
            code_repository = build_utils.verify_local_repository_context(
                deployment=deployment, local_repo_context=local_repo_context
            )

            build_model = build_utils.reuse_or_create_pipeline_build(
                deployment=deployment,
                pipeline_id=pipeline_id,
                allow_build_reuse=not prevent_build_reuse,
                build=build,
                code_repository=code_repository,
            )
            build_id = build_model.id if build_model else None

            code_reference = None
            if local_repo_context and not local_repo_context.is_dirty:
                source_root = source_utils.get_source_root()
                subdirectory = (
                    Path(source_root)
                    .resolve()
                    .relative_to(local_repo_context.root)
                )

                code_reference = CodeReferenceRequestModel(
                    commit=local_repo_context.current_commit,
                    subdirectory=subdirectory.as_posix(),
                    code_repository=local_repo_context.code_repository_id,
                )

            deployment_request = PipelineDeploymentRequestModel(
                user=Client().active_user.id,
                workspace=Client().active_workspace.id,
                stack=stack.id,
                pipeline=pipeline_id,
                build=build_id,
                schedule=schedule_id,
                code_reference=code_reference,
                **deployment.dict(),
            )
            deployment_model = Client().zen_store.create_deployment(
                deployment=deployment_request
            )

            analytics_handler.metadata = self._get_pipeline_analytics_metadata(
                deployment=deployment_model, stack=stack
            )
            caching_status = (
                "enabled"
                if deployment.pipeline_configuration.enable_cache is not False
                else "disabled"
            )
            logger.info(
                "%s %s on stack `%s` (caching %s)",
                "Scheduling" if deployment_model.schedule else "Running",
                f"pipeline `{deployment_model.pipeline_configuration.name}`"
                if register_pipeline
                else "unlisted pipeline",
                stack.name,
                caching_status,
            )

            stack.prepare_pipeline_deployment(deployment=deployment_model)

            # Prevent execution of nested pipelines which might lead to
            # unexpected behavior
            constants.SHOULD_PREVENT_PIPELINE_EXECUTION = True
            try:
                stack.deploy_pipeline(deployment=deployment_model)
            finally:
                constants.SHOULD_PREVENT_PIPELINE_EXECUTION = False

            # Log the dashboard URL
            dashboard_utils.print_run_url(
                run_name=deployment.run_name_template,
                pipeline_id=pipeline_id,
            )

    # get_runs = GetRunsDescriptor()

    def write_run_configuration_template(
        self, path: str, stack: Optional["Stack"] = None
    ) -> None:
        """Writes a run configuration yaml template.

        Args:
            path: The path where the template will be written.
            stack: The stack for which the template should be generated. If
                not given, the active stack will be used.
        """
        from zenml.config.base_settings import ConfigurationLevel
        from zenml.config.step_configurations import (
            PartialArtifactConfiguration,
        )

        self._prepare_if_possible()

        stack = stack or Client().active_stack

        setting_classes = stack.setting_classes
        setting_classes.update(settings_utils.get_general_settings())

        pipeline_settings = {}
        step_settings = {}
        for key, setting_class in setting_classes.items():
            fields = pydantic_utils.TemplateGenerator(setting_class).run()
            if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
                pipeline_settings[key] = fields
            if ConfigurationLevel.STEP in setting_class.LEVEL:
                step_settings[key] = fields

        steps = {}
        for step_name, invocation in self.invocations.items():
            step = invocation.step
            parameters = (
                pydantic_utils.TemplateGenerator(
                    step.entrypoint_definition.legacy_params.annotation
                ).run()
                if step.entrypoint_definition.legacy_params
                else {}
            )
            outputs = {
                name: PartialArtifactConfiguration()
                for name in step.entrypoint_definition.outputs
            }
            step_template = StepConfigurationUpdate(
                parameters=parameters,
                settings=step_settings,
                outputs=outputs,
            )
            steps[step_name] = step_template

        run_config = PipelineRunConfiguration(
            settings=pipeline_settings, steps=steps
        )
        template = pydantic_utils.TemplateGenerator(run_config).run()
        yaml_string = yaml.dump(template)
        yaml_string = yaml_utils.comment_out_yaml(yaml_string)

        with open(path, "w") as f:
            f.write(yaml_string)

    def _apply_configuration(
        self,
        config: PipelineConfigurationUpdate,
        merge: bool = True,
    ) -> None:
        """Applies an update to the pipeline configuration.

        Args:
            config: The configuration update.
            merge: Whether to merge the updates with the existing configuration
                or not. See the `BasePipeline.configure(...)` method for a
                detailed explanation.
        """
        self._validate_configuration(config)
        self._configuration = pydantic_utils.update_model(
            self._configuration, update=config, recursive=merge
        )
        logger.debug("Updated pipeline configuration:")
        logger.debug(self._configuration)

    @staticmethod
    def _validate_configuration(config: PipelineConfigurationUpdate) -> None:
        """Validates a configuration update.

        Args:
            config: The configuration update to validate.
        """
        settings_utils.validate_setting_keys(list(config.settings))

    def _get_pipeline_analytics_metadata(
        self,
        deployment: "PipelineDeploymentResponseModel",
        stack: "Stack",
    ) -> Dict[str, Any]:
        """Returns the pipeline deployment metadata.

        Args:
            deployment: The pipeline deployment to track.
            stack: The stack on which the pipeline will be deployed.

        Returns:
            the metadata about the pipeline deployment
        """
        custom_materializer = False
        for step in deployment.step_configurations.values():
            for output in step.config.outputs.values():
                for source in output.materializer_source:
                    if not source.is_internal:
                        custom_materializer = True

        stack_creator = Client().get_stack(stack.id).user
        active_user = Client().active_user
        own_stack = stack_creator and stack_creator.id == active_user.id

        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        return {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "total_steps": len(self.invocations),
            "schedule": bool(deployment.schedule),
            "custom_materializer": custom_materializer,
            "own_stack": own_stack,
        }

    def _compile(
        self, config_path: Optional[str] = None, **run_configuration_args: Any
    ) -> Tuple[
        "PipelineDeploymentBaseModel",
        "PipelineSpec",
        Optional["Schedule"],
        Union["PipelineBuildBaseModel", UUID, None],
    ]:
        """Compiles the pipeline.

        Args:
            config_path: Path to a config file.
            **run_configuration_args: Configurations for the pipeline run.

        Returns:
            A tuple containing the deployment, spec, schedule and build of
            the compiled pipeline.
        """
        # Activating the built-in integrations to load all materializers
        from zenml.integrations.registry import integration_registry

        integration_registry.activate_integrations()

        if config_path:
            run_config = PipelineRunConfiguration.from_yaml(config_path)
        else:
            run_config = PipelineRunConfiguration()

        new_values = dict_utils.remove_none_values(run_configuration_args)
        update = PipelineRunConfiguration.parse_obj(new_values)

        # Update with the values in code so they take precedence
        run_config = pydantic_utils.update_model(run_config, update=update)

        deployment, pipeline_spec = Compiler().compile(
            pipeline=self,
            stack=Client().active_stack,
            run_configuration=run_config,
        )

        return deployment, pipeline_spec, run_config.schedule, run_config.build

    def _register(
        self, pipeline_spec: "PipelineSpec"
    ) -> "PipelineResponseModel":
        """Register the pipeline in the server.

        Args:
            pipeline_spec: The pipeline spec to register.

        Returns:
            The registered pipeline model.
        """
        version_hash = self._compute_unique_identifier(
            pipeline_spec=pipeline_spec
        )

        client = Client()
        matching_pipelines = client.list_pipelines(
            name=self.name,
            version_hash=version_hash,
            size=1,
            sort_by="desc:created",
        )
        if matching_pipelines.total:
            registered_pipeline = matching_pipelines.items[0]
            logger.info(
                "Reusing registered pipeline `%s` (version: %s).",
                registered_pipeline.name,
                registered_pipeline.version,
            )
            return registered_pipeline

        latest_version = self._get_latest_version() or 0
        version = str(latest_version + 1)

        request = PipelineRequestModel(
            workspace=client.active_workspace.id,
            user=client.active_user.id,
            name=self.name,
            version=version,
            version_hash=version_hash,
            spec=pipeline_spec,
            docstring=self.__doc__,
        )

        registered_pipeline = client.zen_store.create_pipeline(
            pipeline=request
        )
        logger.info(
            "Registered pipeline `%s` (version %s).",
            registered_pipeline.name,
            registered_pipeline.version,
        )
        return registered_pipeline

    def _compute_unique_identifier(self, pipeline_spec: PipelineSpec) -> str:
        """Computes a unique identifier from the pipeline spec and steps.

        Args:
            pipeline_spec: Compiled spec of the pipeline.

        Returns:
            The unique identifier of the pipeline.
        """
        from packaging import version

        hash_ = hashlib.md5()
        hash_.update(pipeline_spec.json_with_string_sources.encode())

        if version.parse(pipeline_spec.version) >= version.parse("0.4"):
            # Only add this for newer versions to keep backwards compatibility
            hash_.update(self.source_code.encode())

        for step_spec in pipeline_spec.steps:
            invocation = self.invocations[step_spec.pipeline_parameter_name]
            step_source = invocation.step.source_code
            hash_.update(step_source.encode())

        return hash_.hexdigest()

    def _get_latest_version(self) -> Optional[int]:
        """Gets the latest version of this pipeline.

        Returns:
            The latest version or `None` if no version exists.
        """
        all_pipelines = Client().list_pipelines(
            name=self.name, sort_by="desc:created", size=1
        )
        if all_pipelines.total:
            pipeline = all_pipelines.items[0]
            if pipeline.version == "UNVERSIONED":
                return None
            return int(all_pipelines.items[0].version)
        else:
            return None

    def _get_registered_model(self) -> Optional[PipelineResponseModel]:
        """Gets the registered pipeline model for this instance.

        Returns:
            The registered pipeline model or None if no model is registered yet.
        """
        self._prepare_if_possible()

        pipeline_spec = Compiler().compile_spec(self)
        version_hash = self._compute_unique_identifier(
            pipeline_spec=pipeline_spec
        )

        pipelines = Client().list_pipelines(
            name=self.name, version_hash=version_hash
        )
        if len(pipelines) == 1:
            return pipelines.items[0]

        return None

    def add_step_invocation(
        self,
        step: "BaseStep",
        input_artifacts: Dict[str, StepArtifact],
        external_artifacts: Dict[str, ExternalArtifact],
        parameters: Dict[str, Any],
        upstream_steps: Set[str],
        custom_id: Optional[str] = None,
        allow_id_suffix: bool = True,
    ) -> str:
        """Adds a step invocation to the pipeline.

        Args:
            step: The step for which to add an invocation.
            input_artifacts: The input artifacts for the invocation.
            external_artifacts: The external artifacts for the invocation.
            parameters: The parameters for the invocation.
            upstream_steps: The upstream steps for the invocation.
            custom_id: Custom ID to use for the invocation.
            allow_id_suffix: Whether a suffix can be appended to the invocation
                ID.

        Raises:
            RuntimeError: If the method is called on an inactive pipeline.
            RuntimeError: If the invocation was called with an artifact from
                a different pipeline.

        Returns:
            The step invocation ID.
        """
        if Pipeline.ACTIVE_PIPELINE != self:
            raise RuntimeError(
                "A step invocation can only be added to an active pipeline."
            )

        for artifact in input_artifacts.values():
            if artifact.pipeline is not self:
                raise RuntimeError(
                    "Got invalid input artifact for invocation of step "
                    f"{step.name}: The input artifact was produced by a step "
                    f"inside a different pipeline {artifact.pipeline.name}."
                )

        invocation_id = self._compute_invocation_id(
            step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
        )
        invocation = StepInvocation(
            id=invocation_id,
            step=step,
            input_artifacts=input_artifacts,
            external_artifacts=external_artifacts,
            parameters=parameters,
            upstream_steps=upstream_steps,
            pipeline=self,
        )
        self._invocations[invocation_id] = invocation
        return invocation_id

    def _compute_invocation_id(
        self,
        step: "BaseStep",
        custom_id: Optional[str] = None,
        allow_suffix: bool = True,
    ) -> str:
        """Compute the invocation ID.

        Args:
            step: The step for which to compute the ID.
            custom_id: Custom ID to use for the invocation.
            allow_suffix: Whether a suffix can be appended to the invocation
                ID.

        Raises:
            RuntimeError: If no ID suffix is allowed and an invocation for the
                same ID already exists.
            RuntimeError: If no unique invocation ID can be found.

        Returns:
            The invocation ID.
        """
        base_id = id_ = custom_id or step.name

        if id_ not in self.invocations:
            return id_

        if not allow_suffix:
            raise RuntimeError("Duplicate step ID")

        for index in range(2, 10000):
            id_ = f"{base_id}_{index}"
            if id_ not in self.invocations:
                return id_

        raise RuntimeError("Unable to find step ID")

    def __enter__(self: T) -> T:
        """Activate the pipeline context.

        Raises:
            RuntimeError: If a different pipeline is already active.

        Returns:
            The pipeline instance.
        """
        if Pipeline.ACTIVE_PIPELINE:
            raise RuntimeError(
                "Unable to enter pipeline context. A different pipeline "
                f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
            )

        Pipeline.ACTIVE_PIPELINE = self
        return self

    def __exit__(self, *args: Any) -> None:
        """Deactivates the pipeline context.

        Args:
            *args: The arguments passed to the context exit handler.
        """
        Pipeline.ACTIVE_PIPELINE = None

    def with_options(
        self,
        run_name: Optional[str] = None,
        schedule: Optional[Schedule] = None,
        build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
        step_configurations: Optional[
            Mapping[str, "StepConfigurationUpdateOrDict"]
        ] = None,
        config_path: Optional[str] = None,
        unlisted: bool = False,
        prevent_build_reuse: bool = False,
        **kwargs: Any,
    ) -> "Pipeline":
        """Copies the pipeline and applies the given configurations.

        Args:
            run_name: Name of the pipeline run.
            schedule: Optional schedule to use for the run.
            build: Optional build to use for the run.
            step_configurations: Configurations for steps of the pipeline.
            config_path: Path to a yaml configuration file. This file will
                be parsed as a
                `zenml.config.pipeline_configurations.PipelineRunConfiguration`
                object. Options provided in this file will be overwritten by
                options provided in code using the other arguments of this
                method.
            unlisted: Whether the pipeline run should be unlisted (not assigned
                to any pipeline).
            prevent_build_reuse: Whether to prevent the reuse of a build.
            **kwargs: Pipeline configuration options. These will be passed
                to the `pipeline.configure(...)` method.

        Returns:
            The copied pipeline instance.
        """
        pipeline_copy = self.copy()
        pipeline_copy.configure(**kwargs)

        run_args = dict_utils.remove_none_values(
            {
                "run_name": run_name,
                "schedule": schedule,
                "build": build,
                "step_configurations": step_configurations,
                "config_path": config_path,
                "unlisted": unlisted,
                "prevent_build_reuse": prevent_build_reuse,
            }
        )
        pipeline_copy._run_args.update(run_args)
        return pipeline_copy

    def copy(self) -> "Pipeline":
        """Copies the pipeline.

        Returns:
            The pipeline copy.
        """
        return copy.deepcopy(self)

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """Handle a call of the pipeline.

        This method does one of two things:
        * If there is an active pipeline context, it calls the pipeline
          entrypoint function within that context and the step invocations
          will be added to the active pipeline.
        * If no pipeline is active, it activates this pipeline before calling
          the entrypoint function.

        Args:
            *args: Entrypoint function arguments.
            **kwargs: Entrypoint function keyword arguments.

        Returns:
            The outputs of the entrypoint function call.
        """
        if Pipeline.ACTIVE_PIPELINE:
            # Calling a pipeline inside a pipeline, we return the potential
            # outputs of the entrypoint function

            # TODO: This currently ignores the configuration of the pipeline
            # and instead applies the configuration of the previously active
            # pipeline. Is this what we want?
            return self.entrypoint(*args, **kwargs)

        self.prepare(*args, **kwargs)
        return self._run(**self._run_args)

    def _call_entrypoint(self, *args: Any, **kwargs: Any) -> None:
        """Calls the pipeline entrypoint function with the given arguments.

        Args:
            *args: Entrypoint function arguments.
            **kwargs: Entrypoint function keyword arguments.

        Raises:
            ValueError: If an input argument is missing or not JSON
                serializable.
        """
        try:
            validated_args = pydantic_utils.validate_function_args(
                self.entrypoint,
                {"arbitrary_types_allowed": False, "smart_union": True},
                *args,
                **kwargs,
            )
        except ValidationError as e:
            raise ValueError(
                "Invalid or missing inputs for pipeline entrypoint function. "
                "Only JSON serializable inputs are allowed as pipeline inputs."
            ) from e

        self._parameters = validated_args
        self.entrypoint(**validated_args)

    def _prepare_if_possible(self) -> None:
        """Prepares the pipeline if possible.

        Raises:
            RuntimeError: If the pipeline is not prepared and the preparation
                requires parameters.
        """
        if not self.is_prepared:
            if self.requires_parameters:
                raise RuntimeError(
                    f"Failed while trying to prepare pipeline {self.name}. "
                    "The entrypoint function of the pipeline requires "
                    "arguments. Please prepare the pipeline by calling "
                    "`pipeline_instance.prepare(...)` and try again."
                )
            else:
                self.prepare()
configuration: PipelineConfiguration property readonly

The configuration of the pipeline.

Returns:

Type Description
PipelineConfiguration

The configuration of the pipeline.

enable_cache: Optional[bool] property readonly

If caching is enabled for the pipeline.

Returns:

Type Description
Optional[bool]

If caching is enabled for the pipeline.

invocations: Dict[str, zenml.steps.step_invocation.StepInvocation] property readonly

Returns the step invocations of this pipeline.

This dictionary will only be populated once the pipeline has been called.

Returns:

Type Description
Dict[str, zenml.steps.step_invocation.StepInvocation]

The step invocations.

is_prepared: bool property readonly

If the pipeline is prepared.

Prepared means that the pipeline entrypoint has been called and the pipeline is fully defined.

Returns:

Type Description
bool

If the pipeline is prepared.

name: str property readonly

The name of the pipeline.

Returns:

Type Description
str

The name of the pipeline.

requires_parameters: bool property readonly

If the pipeline entrypoint requires parameters.

Returns:

Type Description
bool

If the pipeline entrypoint requires parameters.

source_code: str property readonly

The source code of this pipeline.

Returns:

Type Description
str

The source code of this pipeline.

source_object: Any property readonly

The source object of this pipeline.

Returns:

Type Description
Any

The source object of this pipeline.

__call__(self, *args, **kwargs) special

Handle a call of the pipeline.

This method does one of two things: * If there is an active pipeline context, it calls the pipeline entrypoint function within that context and the step invocations will be added to the active pipeline. * If no pipeline is active, it activates this pipeline before calling the entrypoint function.

Parameters:

Name Type Description Default
*args Any

Entrypoint function arguments.

()
**kwargs Any

Entrypoint function keyword arguments.

{}

Returns:

Type Description
Any

The outputs of the entrypoint function call.

Source code in zenml/new/pipelines/pipeline.py
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """Handle a call of the pipeline.

    This method does one of two things:
    * If there is an active pipeline context, it calls the pipeline
      entrypoint function within that context and the step invocations
      will be added to the active pipeline.
    * If no pipeline is active, it activates this pipeline before calling
      the entrypoint function.

    Args:
        *args: Entrypoint function arguments.
        **kwargs: Entrypoint function keyword arguments.

    Returns:
        The outputs of the entrypoint function call.
    """
    if Pipeline.ACTIVE_PIPELINE:
        # Calling a pipeline inside a pipeline, we return the potential
        # outputs of the entrypoint function

        # TODO: This currently ignores the configuration of the pipeline
        # and instead applies the configuration of the previously active
        # pipeline. Is this what we want?
        return self.entrypoint(*args, **kwargs)

    self.prepare(*args, **kwargs)
    return self._run(**self._run_args)
__enter__(self) special

Activate the pipeline context.

Exceptions:

Type Description
RuntimeError

If a different pipeline is already active.

Returns:

Type Description
~T

The pipeline instance.

Source code in zenml/new/pipelines/pipeline.py
def __enter__(self: T) -> T:
    """Activate the pipeline context.

    Raises:
        RuntimeError: If a different pipeline is already active.

    Returns:
        The pipeline instance.
    """
    if Pipeline.ACTIVE_PIPELINE:
        raise RuntimeError(
            "Unable to enter pipeline context. A different pipeline "
            f"{Pipeline.ACTIVE_PIPELINE.name} is already active."
        )

    Pipeline.ACTIVE_PIPELINE = self
    return self
__exit__(self, *args) special

Deactivates the pipeline context.

Parameters:

Name Type Description Default
*args Any

The arguments passed to the context exit handler.

()
Source code in zenml/new/pipelines/pipeline.py
def __exit__(self, *args: Any) -> None:
    """Deactivates the pipeline context.

    Args:
        *args: The arguments passed to the context exit handler.
    """
    Pipeline.ACTIVE_PIPELINE = None
__init__(self, name, entrypoint, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, settings=None, extra=None, on_failure=None, on_success=None) special

Initializes a pipeline.

Parameters:

Name Type Description Default
name str

The name of the pipeline.

required
entrypoint ~F

The entrypoint function of the pipeline.

required
enable_cache Optional[bool]

If caching should be enabled for this pipeline.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this pipeline.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this pipeline.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with three possible parameters, StepContext, BaseParameters, and BaseException, or a source path to a function of the same specifications (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with two possible parameters, StepContext and BaseParameters, or a source path to a function of the same specifications (e.g.module.my_function`).

None
Source code in zenml/new/pipelines/pipeline.py
def __init__(
    self,
    name: str,
    entrypoint: F,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
) -> None:
    """Initializes a pipeline.

    Args:
        name: The name of the pipeline.
        entrypoint: The entrypoint function of the pipeline.
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline.
        settings: settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can
            be a function with three possible parameters, `StepContext`,
            `BaseParameters`, and `BaseException`, or a source path to a
            function of the same specifications (e.g. `module.my_function`).
        on_success: Callback function in event of failure of the step. Can
            be a function with two possible parameters, `StepContext` and
            `BaseParameters, or a source path to a function of the same
            specifications (e.g. `module.my_function`).
    """
    self._invocations: Dict[str, StepInvocation] = {}
    self._run_args: Dict[str, Any] = {}

    self._configuration = PipelineConfiguration(
        name=name,
    )
    self.configure(
        enable_cache=enable_cache,
        enable_artifact_metadata=enable_artifact_metadata,
        enable_artifact_visualization=enable_artifact_visualization,
        settings=settings,
        extra=extra,
        on_failure=on_failure,
        on_success=on_success,
    )
    self.entrypoint = entrypoint
    self._parameters: Dict[str, Any] = {}
add_step_invocation(self, step, input_artifacts, external_artifacts, parameters, upstream_steps, custom_id=None, allow_id_suffix=True)

Adds a step invocation to the pipeline.

Parameters:

Name Type Description Default
step BaseStep

The step for which to add an invocation.

required
input_artifacts Dict[str, zenml.steps.entrypoint_function_utils.StepArtifact]

The input artifacts for the invocation.

required
external_artifacts Dict[str, zenml.steps.external_artifact.ExternalArtifact]

The external artifacts for the invocation.

required
parameters Dict[str, Any]

The parameters for the invocation.

required
upstream_steps Set[str]

The upstream steps for the invocation.

required
custom_id Optional[str]

Custom ID to use for the invocation.

None
allow_id_suffix bool

Whether a suffix can be appended to the invocation ID.

True

Exceptions:

Type Description
RuntimeError

If the method is called on an inactive pipeline.

RuntimeError

If the invocation was called with an artifact from a different pipeline.

Returns:

Type Description
str

The step invocation ID.

Source code in zenml/new/pipelines/pipeline.py
def add_step_invocation(
    self,
    step: "BaseStep",
    input_artifacts: Dict[str, StepArtifact],
    external_artifacts: Dict[str, ExternalArtifact],
    parameters: Dict[str, Any],
    upstream_steps: Set[str],
    custom_id: Optional[str] = None,
    allow_id_suffix: bool = True,
) -> str:
    """Adds a step invocation to the pipeline.

    Args:
        step: The step for which to add an invocation.
        input_artifacts: The input artifacts for the invocation.
        external_artifacts: The external artifacts for the invocation.
        parameters: The parameters for the invocation.
        upstream_steps: The upstream steps for the invocation.
        custom_id: Custom ID to use for the invocation.
        allow_id_suffix: Whether a suffix can be appended to the invocation
            ID.

    Raises:
        RuntimeError: If the method is called on an inactive pipeline.
        RuntimeError: If the invocation was called with an artifact from
            a different pipeline.

    Returns:
        The step invocation ID.
    """
    if Pipeline.ACTIVE_PIPELINE != self:
        raise RuntimeError(
            "A step invocation can only be added to an active pipeline."
        )

    for artifact in input_artifacts.values():
        if artifact.pipeline is not self:
            raise RuntimeError(
                "Got invalid input artifact for invocation of step "
                f"{step.name}: The input artifact was produced by a step "
                f"inside a different pipeline {artifact.pipeline.name}."
            )

    invocation_id = self._compute_invocation_id(
        step=step, custom_id=custom_id, allow_suffix=allow_id_suffix
    )
    invocation = StepInvocation(
        id=invocation_id,
        step=step,
        input_artifacts=input_artifacts,
        external_artifacts=external_artifacts,
        parameters=parameters,
        upstream_steps=upstream_steps,
        pipeline=self,
    )
    self._invocations[invocation_id] = invocation
    return invocation_id
build(self, settings=None, step_configurations=None, config_path=None)

Builds Docker images for the pipeline.

Parameters:

Name Type Description Default
settings Optional[Mapping[str, SettingsOrDict]]

Settings for the pipeline.

None
step_configurations Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline.

None
config_path Optional[str]

Path to a yaml configuration file. This file will be parsed as a zenml.config.pipeline_configurations.PipelineRunConfiguration object. Options provided in this file will be overwritten by options provided in code using the other arguments of this method.

None

Returns:

Type Description
Optional[PipelineBuildResponseModel]

The build output.

Source code in zenml/new/pipelines/pipeline.py
def build(
    self,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    config_path: Optional[str] = None,
) -> Optional["PipelineBuildResponseModel"]:
    """Builds Docker images for the pipeline.

    Args:
        settings: Settings for the pipeline.
        step_configurations: Configurations for steps of the pipeline.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.

    Returns:
        The build output.
    """
    with event_handler(event=AnalyticsEvent.BUILD_PIPELINE, v2=True):
        self._prepare_if_possible()
        deployment, pipeline_spec, _, _ = self._compile(
            config_path=config_path,
            steps=step_configurations,
            settings=settings,
        )
        pipeline_id = self._register(pipeline_spec=pipeline_spec).id

        local_repo = code_repository_utils.find_active_code_repository()
        code_repository = build_utils.verify_local_repository_context(
            deployment=deployment, local_repo_context=local_repo
        )

        return build_utils.create_pipeline_build(
            deployment=deployment,
            pipeline_id=pipeline_id,
            code_repository=code_repository,
        )
configure(self, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, settings=None, extra=None, on_failure=None, on_success=None, merge=True)

Configures the pipeline.

Configuration merging example: * merge==True: pipeline.configure(extra={"key1": 1}) pipeline.configure(extra={"key2": 2}, merge=True) pipeline.configuration.extra # {"key1": 1, "key2": 2} * merge==False: pipeline.configure(extra={"key1": 1}) pipeline.configure(extra={"key2": 2}, merge=False) pipeline.configuration.extra # {"key2": 2}

Parameters:

Name Type Description Default
enable_cache Optional[bool]

If caching should be enabled for this pipeline.

None
enable_artifact_metadata Optional[bool]

If artifact metadata should be enabled for this pipeline.

None
enable_artifact_visualization Optional[bool]

If artifact visualization should be enabled for this pipeline.

None
settings Optional[Mapping[str, SettingsOrDict]]

settings for this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with three possible parameters, StepContext, BaseParameters, and BaseException, or a source path to a function of the same specifications (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with two possible parameters, StepContext and BaseParameters, or a source path to a function of the same specifications (e.g.module.my_function`).

None
merge bool

If True, will merge the given dictionary configurations like extra and settings with existing configurations. If False the given configurations will overwrite all existing ones. See the general description of this method for an example.

True

Returns:

Type Description
~T

The pipeline instance that this method was called on.

Source code in zenml/new/pipelines/pipeline.py
def configure(
    self: T,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    settings: Optional[Mapping[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
    merge: bool = True,
) -> T:
    """Configures the pipeline.

    Configuration merging example:
    * `merge==True`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=True)
        pipeline.configuration.extra # {"key1": 1, "key2": 2}
    * `merge==False`:
        pipeline.configure(extra={"key1": 1})
        pipeline.configure(extra={"key2": 2}, merge=False)
        pipeline.configuration.extra # {"key2": 2}

    Args:
        enable_cache: If caching should be enabled for this pipeline.
        enable_artifact_metadata: If artifact metadata should be enabled for
            this pipeline.
        enable_artifact_visualization: If artifact visualization should be
            enabled for this pipeline.
        settings: settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can
            be a function with three possible parameters, `StepContext`,
            `BaseParameters`, and `BaseException`, or a source path to a
            function of the same specifications (e.g. `module.my_function`).
        on_success: Callback function in event of failure of the step. Can
            be a function with two possible parameters, `StepContext` and
            `BaseParameters, or a source path to a function of the same
            specifications (e.g. `module.my_function`).
        merge: If `True`, will merge the given dictionary configurations
            like `extra` and `settings` with existing
            configurations. If `False` the given configurations will
            overwrite all existing ones. See the general description of this
            method for an example.

    Returns:
        The pipeline instance that this method was called on.
    """
    failure_hook_source = None
    if on_failure:
        # string of on_failure hook function to be used for this pipeline
        failure_hook_source = resolve_and_validate_hook(on_failure)

    success_hook_source = None
    if on_success:
        # string of on_success hook function to be used for this pipeline
        success_hook_source = resolve_and_validate_hook(on_success)

    values = dict_utils.remove_none_values(
        {
            "enable_cache": enable_cache,
            "enable_artifact_metadata": enable_artifact_metadata,
            "enable_artifact_visualization": enable_artifact_visualization,
            "settings": settings,
            "extra": extra,
            "failure_hook_source": failure_hook_source,
            "success_hook_source": success_hook_source,
        }
    )
    config = PipelineConfigurationUpdate(**values)
    self._apply_configuration(config, merge=merge)
    return self
copy(self)

Copies the pipeline.

Returns:

Type Description
Pipeline

The pipeline copy.

Source code in zenml/new/pipelines/pipeline.py
def copy(self) -> "Pipeline":
    """Copies the pipeline.

    Returns:
        The pipeline copy.
    """
    return copy.deepcopy(self)
from_model(model) classmethod

Creates a pipeline instance from a model.

Parameters:

Name Type Description Default
model PipelineResponseModel

The model to load the pipeline instance from.

required

Returns:

Type Description
Pipeline

The pipeline instance.

Source code in zenml/new/pipelines/pipeline.py
@classmethod
def from_model(cls, model: "PipelineResponseModel") -> "Pipeline":
    """Creates a pipeline instance from a model.

    Args:
        model: The model to load the pipeline instance from.

    Returns:
        The pipeline instance.
    """
    from zenml.new.pipelines.deserialization_utils import load_pipeline

    return load_pipeline(model=model)
prepare(self, *args, **kwargs)

Prepares the pipeline.

Parameters:

Name Type Description Default
*args Any

Pipeline entrypoint input arguments.

()
**kwargs Any

Pipeline entrypoint input keyword arguments.

{}
Source code in zenml/new/pipelines/pipeline.py
def prepare(self, *args: Any, **kwargs: Any) -> None:
    """Prepares the pipeline.

    Args:
        *args: Pipeline entrypoint input arguments.
        **kwargs: Pipeline entrypoint input keyword arguments.
    """
    # Clear existing parameters and invocations
    self._parameters = {}
    self._invocations = {}

    with self:
        # Enter the context manager so we become the active pipeline. This
        # means that all steps that get called while the entrypoint function
        # is executed will be added as invocation to this pipeline instance.
        self._call_entrypoint(*args, **kwargs)
register(self)

Register the pipeline in the server.

Returns:

Type Description
PipelineResponseModel

The registered pipeline model.

Source code in zenml/new/pipelines/pipeline.py
def register(self) -> "PipelineResponseModel":
    """Register the pipeline in the server.

    Returns:
        The registered pipeline model.
    """
    # Activating the built-in integrations to load all materializers
    from zenml.integrations.registry import integration_registry

    self._prepare_if_possible()
    integration_registry.activate_integrations()

    custom_configurations = self.configuration.dict(
        exclude_defaults=True, exclude={"name"}
    )
    if custom_configurations:
        logger.warning(
            f"The pipeline `{self.name}` that you're registering has "
            "custom configurations applied to it. These will not be "
            "registered with the pipeline and won't be set when you build "
            "images or run the pipeline from the CLI. To provide these "
            "configurations, use the `--config` option of the `zenml "
            "pipeline build/run` commands."
        )

    pipeline_spec = Compiler().compile_spec(self)
    return self._register(pipeline_spec=pipeline_spec)
resolve(self)

Resolves the pipeline.

Returns:

Type Description
Source

The pipeline source.

Source code in zenml/new/pipelines/pipeline.py
def resolve(self) -> "Source":
    """Resolves the pipeline.

    Returns:
        The pipeline source.
    """
    return source_utils.resolve(self.entrypoint, skip_validation=True)
with_options(self, run_name=None, schedule=None, build=None, step_configurations=None, config_path=None, unlisted=False, prevent_build_reuse=False, **kwargs)

Copies the pipeline and applies the given configurations.

Parameters:

Name Type Description Default
run_name Optional[str]

Name of the pipeline run.

None
schedule Optional[zenml.config.schedule.Schedule]

Optional schedule to use for the run.

None
build Union[str, UUID, PipelineBuildBaseModel]

Optional build to use for the run.

None
step_configurations Optional[Mapping[str, StepConfigurationUpdateOrDict]]

Configurations for steps of the pipeline.

None
config_path Optional[str]

Path to a yaml configuration file. This file will be parsed as a zenml.config.pipeline_configurations.PipelineRunConfiguration object. Options provided in this file will be overwritten by options provided in code using the other arguments of this method.

None
unlisted bool

Whether the pipeline run should be unlisted (not assigned to any pipeline).

False
prevent_build_reuse bool

Whether to prevent the reuse of a build.

False
**kwargs Any

Pipeline configuration options. These will be passed to the pipeline.configure(...) method.

{}

Returns:

Type Description
Pipeline

The copied pipeline instance.

Source code in zenml/new/pipelines/pipeline.py
def with_options(
    self,
    run_name: Optional[str] = None,
    schedule: Optional[Schedule] = None,
    build: Union[str, "UUID", "PipelineBuildBaseModel", None] = None,
    step_configurations: Optional[
        Mapping[str, "StepConfigurationUpdateOrDict"]
    ] = None,
    config_path: Optional[str] = None,
    unlisted: bool = False,
    prevent_build_reuse: bool = False,
    **kwargs: Any,
) -> "Pipeline":
    """Copies the pipeline and applies the given configurations.

    Args:
        run_name: Name of the pipeline run.
        schedule: Optional schedule to use for the run.
        build: Optional build to use for the run.
        step_configurations: Configurations for steps of the pipeline.
        config_path: Path to a yaml configuration file. This file will
            be parsed as a
            `zenml.config.pipeline_configurations.PipelineRunConfiguration`
            object. Options provided in this file will be overwritten by
            options provided in code using the other arguments of this
            method.
        unlisted: Whether the pipeline run should be unlisted (not assigned
            to any pipeline).
        prevent_build_reuse: Whether to prevent the reuse of a build.
        **kwargs: Pipeline configuration options. These will be passed
            to the `pipeline.configure(...)` method.

    Returns:
        The copied pipeline instance.
    """
    pipeline_copy = self.copy()
    pipeline_copy.configure(**kwargs)

    run_args = dict_utils.remove_none_values(
        {
            "run_name": run_name,
            "schedule": schedule,
            "build": build,
            "step_configurations": step_configurations,
            "config_path": config_path,
            "unlisted": unlisted,
            "prevent_build_reuse": prevent_build_reuse,
        }
    )
    pipeline_copy._run_args.update(run_args)
    return pipeline_copy
write_run_configuration_template(self, path, stack=None)

Writes a run configuration yaml template.

Parameters:

Name Type Description Default
path str

The path where the template will be written.

required
stack Optional[Stack]

The stack for which the template should be generated. If not given, the active stack will be used.

None
Source code in zenml/new/pipelines/pipeline.py
def write_run_configuration_template(
    self, path: str, stack: Optional["Stack"] = None
) -> None:
    """Writes a run configuration yaml template.

    Args:
        path: The path where the template will be written.
        stack: The stack for which the template should be generated. If
            not given, the active stack will be used.
    """
    from zenml.config.base_settings import ConfigurationLevel
    from zenml.config.step_configurations import (
        PartialArtifactConfiguration,
    )

    self._prepare_if_possible()

    stack = stack or Client().active_stack

    setting_classes = stack.setting_classes
    setting_classes.update(settings_utils.get_general_settings())

    pipeline_settings = {}
    step_settings = {}
    for key, setting_class in setting_classes.items():
        fields = pydantic_utils.TemplateGenerator(setting_class).run()
        if ConfigurationLevel.PIPELINE in setting_class.LEVEL:
            pipeline_settings[key] = fields
        if ConfigurationLevel.STEP in setting_class.LEVEL:
            step_settings[key] = fields

    steps = {}
    for step_name, invocation in self.invocations.items():
        step = invocation.step
        parameters = (
            pydantic_utils.TemplateGenerator(
                step.entrypoint_definition.legacy_params.annotation
            ).run()
            if step.entrypoint_definition.legacy_params
            else {}
        )
        outputs = {
            name: PartialArtifactConfiguration()
            for name in step.entrypoint_definition.outputs
        }
        step_template = StepConfigurationUpdate(
            parameters=parameters,
            settings=step_settings,
            outputs=outputs,
        )
        steps[step_name] = step_template

    run_config = PipelineRunConfiguration(
        settings=pipeline_settings, steps=steps
    )
    template = pydantic_utils.TemplateGenerator(run_config).run()
    yaml_string = yaml.dump(template)
    yaml_string = yaml_utils.comment_out_yaml(yaml_string)

    with open(path, "w") as f:
        f.write(yaml_string)

pipeline_decorator

ZenML pipeline decorator definition.

pipeline(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, settings=None, extra=None, on_failure=None, on_success=None)

Decorator to create a pipeline.

Parameters:

Name Type Description Default
_func Optional[F]

The decorated function.

None
name Optional[str]

The name of the pipeline. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Whether to use caching or not.

None
enable_artifact_metadata Optional[bool]

Whether to enable artifact metadata or not.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this pipeline.

None
extra Optional[Dict[str, Any]]

Extra configurations for this pipeline.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with three possible parameters, StepContext, BaseParameters, and BaseException, or a source path to a function of the same specifications (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with two possible parameters, StepContext and BaseParameters, or a source path to a function of the same specifications (e.g.module.my_function`).

None

Returns:

Type Description
Union[Pipeline, Callable[[F], Pipeline]]

A pipeline instance.

Source code in zenml/new/pipelines/pipeline_decorator.py
def pipeline(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
) -> Union["Pipeline", Callable[["F"], "Pipeline"]]:
    """Decorator to create a pipeline.

    Args:
        _func: The decorated function.
        name: The name of the pipeline. If left empty, the name of the
            decorated function will be used as a fallback.
        enable_cache: Whether to use caching or not.
        enable_artifact_metadata: Whether to enable artifact metadata or not.
        settings: Settings for this pipeline.
        extra: Extra configurations for this pipeline.
        on_failure: Callback function in event of failure of the step. Can be
            a function with three possible parameters,
            `StepContext`, `BaseParameters`, and `BaseException`,
            or a source path to a function of the same specifications
            (e.g. `module.my_function`).
        on_success: Callback function in event of failure of the step. Can be
            a function with two possible parameters, `StepContext` and
            `BaseParameters, or a source path to a function of the same specifications
            (e.g. `module.my_function`).

    Returns:
        A pipeline instance.
    """

    def inner_decorator(func: "F") -> "Pipeline":
        from zenml.new.pipelines.pipeline import Pipeline

        p = Pipeline(
            name=name or func.__name__,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
            entrypoint=func,
        )

        p.__doc__ = func.__doc__
        return p

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)

steps special

decorated_step

Internal BaseStep subclass used by the step decorator.

step_decorator

Step decorator function.

step(_func=None, *, name=None, enable_cache=None, enable_artifact_metadata=None, enable_artifact_visualization=None, experiment_tracker=None, step_operator=None, output_materializers=None, settings=None, extra=None, on_failure=None, on_success=None)

Decorator to create a ZenML step.

Parameters:

Name Type Description Default
_func Optional[F]

The decorated function.

None
name Optional[str]

The name of the step. If left empty, the name of the decorated function will be used as a fallback.

None
enable_cache Optional[bool]

Specify whether caching is enabled for this step. If no value is passed, caching is enabled by default unless the step requires a StepContext (see zenml.steps.step_context.StepContext for more information).

None
enable_artifact_metadata Optional[bool]

Specify whether metadata is enabled for this step. If no value is passed, metadata is enabled by default.

None
enable_artifact_visualization Optional[bool]

Specify whether visualization is enabled for this step. If no value is passed, visualization is enabled by default.

None
experiment_tracker Optional[str]

The experiment tracker to use for this step.

None
step_operator Optional[str]

The step operator to use for this step.

None
output_materializers Optional[OutputMaterializersSpecification]

Output materializers for this step. If given as a dict, the keys must be a subset of the output names of this step. If a single value (type or string) is given, the materializer will be used for all outputs.

None
settings Optional[Dict[str, SettingsOrDict]]

Settings for this step.

None
extra Optional[Dict[str, Any]]

Extra configurations for this step.

None
on_failure Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with three possible parameters, StepContext, BaseParameters, and BaseException, or a source path to a function of the same specifications (e.g. module.my_function).

None
on_success Optional[HookSpecification]

Callback function in event of failure of the step. Can be a function with two possible parameters, StepContext and BaseParameters, or a source path to a function of the same specifications (e.g.module.my_function`).

None

Returns:

Type Description
Union[BaseStep, Callable[[F], BaseStep]]

The step instance.

Source code in zenml/new/steps/step_decorator.py
def step(
    _func: Optional["F"] = None,
    *,
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    experiment_tracker: Optional[str] = None,
    step_operator: Optional[str] = None,
    output_materializers: Optional["OutputMaterializersSpecification"] = None,
    settings: Optional[Dict[str, "SettingsOrDict"]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional["HookSpecification"] = None,
    on_success: Optional["HookSpecification"] = None,
) -> Union["BaseStep", Callable[["F"], "BaseStep"]]:
    """Decorator to create a ZenML step.

    Args:
        _func: The decorated function.
        name: The name of the step. If left empty, the name of the decorated
            function will be used as a fallback.
        enable_cache: Specify whether caching is enabled for this step. If no
            value is passed, caching is enabled by default unless the step
            requires a `StepContext` (see
            `zenml.steps.step_context.StepContext` for more information).
        enable_artifact_metadata: Specify whether metadata is enabled for this
            step. If no value is passed, metadata is enabled by default.
        enable_artifact_visualization: Specify whether visualization is enabled
            for this step. If no value is passed, visualization is enabled by
            default.
        experiment_tracker: The experiment tracker to use for this step.
        step_operator: The step operator to use for this step.
        output_materializers: Output materializers for this step. If
            given as a dict, the keys must be a subset of the output names
            of this step. If a single value (type or string) is given, the
            materializer will be used for all outputs.
        settings: Settings for this step.
        extra: Extra configurations for this step.
        on_failure: Callback function in event of failure of the step. Can be
            a function with three possible parameters,
            `StepContext`, `BaseParameters`, and `BaseException`,
            or a source path to a function of the same specifications
            (e.g. `module.my_function`).
        on_success: Callback function in event of failure of the step. Can be
            a function with two possible parameters, `StepContext` and
            `BaseParameters, or a source path to a function of the same specifications
            (e.g. `module.my_function`).

    Returns:
        The step instance.
    """

    def inner_decorator(func: "F") -> "BaseStep":
        from zenml.new.steps.decorated_step import _DecoratedStep

        class_: Type["BaseStep"] = type(
            func.__name__,
            (_DecoratedStep,),
            {
                "entrypoint": staticmethod(func),
                "__module__": func.__module__,
                "__doc__": func.__doc__,
            },
        )

        step_instance = class_(
            name=name or func.__name__,
            enable_cache=enable_cache,
            enable_artifact_metadata=enable_artifact_metadata,
            enable_artifact_visualization=enable_artifact_visualization,
            experiment_tracker=experiment_tracker,
            step_operator=step_operator,
            output_materializers=output_materializers,
            settings=settings,
            extra=extra,
            on_failure=on_failure,
            on_success=on_success,
        )

        return step_instance

    if _func is None:
        return inner_decorator
    else:
        return inner_decorator(_func)