Skip to content

Post Execution

zenml.post_execution special

After executing a pipeline, the user needs to be able to fetch it from history and perform certain tasks. The post_execution submodule provides a set of interfaces with which the user can interact with artifacts, the pipeline, steps, and the post-run pipeline object.

artifact

ArtifactView

Post-execution artifact class which can be used to read artifact data that was created during a pipeline execution.

Source code in zenml/post_execution/artifact.py
class ArtifactView:
    """Post-execution artifact class which can be used to read
    artifact data that was created during a pipeline execution.
    """

    def __init__(
        self,
        id_: int,
        type_: str,
        uri: str,
        materializer: str,
        data_type: str,
        metadata_store: "BaseMetadataStore",
        parent_step_id: int,
    ):
        """Initializes a post-execution artifact object.

        In most cases `ArtifactView` objects should not be created manually but
        retrieved from a `StepView` via the `inputs` or `outputs` properties.

        Args:
            id_: The artifact id.
            type_: The type of this artifact.
            uri: Specifies where the artifact data is stored.
            materializer: Information needed to restore the materializer
                that was used to write this artifact.
            data_type: The type of data that was passed to the materializer
                when writing that artifact. Will be used as a default type
                to read the artifact.
            metadata_store: The metadata store which should be used to fetch
                additional information related to this pipeline.
            parent_step_id: The ID of the parent step.
        """
        self._id = id_
        self._type = type_
        self._uri = uri
        self._materializer = materializer
        self._data_type = data_type
        self._metadata_store = metadata_store
        self._parent_step_id = parent_step_id

    @property
    def id(self) -> int:
        """Returns the artifact id."""
        return self._id

    @property
    def type(self) -> str:
        """Returns the artifact type."""
        return self._type

    @property
    def data_type(self) -> str:
        """Returns the data type of the artifact."""
        return self._data_type

    @property
    def uri(self) -> str:
        """Returns the URI where the artifact data is stored."""
        return self._uri

    @property
    def parent_step_id(self) -> int:
        """Returns the ID of the parent step. This need not be equivalent to
        the ID of the producer step."""
        return self._parent_step_id

    @property
    def producer_step(self) -> "StepView":
        """Returns the original StepView that produced the artifact."""
        # TODO [ENG-174]: Replace with artifact.id instead of passing self if
        #  required.
        return self._metadata_store.get_producer_step_from_artifact(self)

    @property
    def is_cached(self) -> bool:
        """Returns True if artifact was cached in a previous run, else False."""
        # self._metadata_store.
        return self.producer_step.id != self.parent_step_id

    def read(
        self,
        output_data_type: Optional[Type[Any]] = None,
        materializer_class: Optional[Type["BaseMaterializer"]] = None,
    ) -> Any:
        """Materializes the data stored in this artifact.

        Args:
            output_data_type: The datatype to which the materializer should
                read, will be passed to the materializers `handle_input` method.
            materializer_class: The class of the materializer that should be
                used to read the artifact data. If no materializer class is
                given, we use the materializer that was used to write the
                artifact during execution of the pipeline.

        Returns:
              The materialized data.
        """

        if not materializer_class:
            try:
                materializer_class = source_utils.load_source_path_class(
                    self._materializer
                )
            except (ModuleNotFoundError, AttributeError) as e:
                logger.error(
                    f"ZenML can not locate and import the materializer module "
                    f"{self._materializer} which was used to write this "
                    f"artifact. If you want to read from it, please provide "
                    f"a 'materializer_class'."
                )
                raise ModuleNotFoundError(e) from e

        if not output_data_type:
            try:
                output_data_type = source_utils.load_source_path_class(
                    self._data_type
                )
            except (ModuleNotFoundError, AttributeError) as e:
                logger.error(
                    f"ZenML can not locate and import the data type of this "
                    f"artifact {self._data_type}. If you want to read "
                    f"from it, please provide a 'output_data_type'."
                )
                raise ModuleNotFoundError(e) from e

        logger.debug(
            "Using '%s' to read '%s' (uri: %s).",
            materializer_class.__qualname__,
            self._type,
            self._uri,
        )

        # TODO [ENG-162]: passing in `self` to initialize the materializer only
        #  works because materializers only require a `.uri` property at the
        #  moment.
        materializer = materializer_class(self)  # type: ignore[arg-type]
        return materializer.handle_input(output_data_type)

    def __repr__(self) -> str:
        """Returns a string representation of this artifact."""
        return (
            f"{self.__class__.__qualname__}(id={self._id}, "
            f"type='{self._type}', uri='{self._uri}', "
            f"materializer='{self._materializer}')"
        )

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the
        same artifact."""
        if isinstance(other, ArtifactView):
            return self._id == other._id and self._uri == other._uri
        return NotImplemented
data_type: str property readonly

Returns the data type of the artifact.

id: int property readonly

Returns the artifact id.

is_cached: bool property readonly

Returns True if artifact was cached in a previous run, else False.

parent_step_id: int property readonly

Returns the ID of the parent step. This need not be equivalent to the ID of the producer step.

producer_step: StepView property readonly

Returns the original StepView that produced the artifact.

type: str property readonly

Returns the artifact type.

uri: str property readonly

Returns the URI where the artifact data is stored.

__eq__(self, other) special

Returns whether the other object is referring to the same artifact.

Source code in zenml/post_execution/artifact.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the
    same artifact."""
    if isinstance(other, ArtifactView):
        return self._id == other._id and self._uri == other._uri
    return NotImplemented
__init__(self, id_, type_, uri, materializer, data_type, metadata_store, parent_step_id) special

Initializes a post-execution artifact object.

In most cases ArtifactView objects should not be created manually but retrieved from a StepView via the inputs or outputs properties.

Parameters:

Name Type Description Default
id_ int

The artifact id.

required
type_ str

The type of this artifact.

required
uri str

Specifies where the artifact data is stored.

required
materializer str

Information needed to restore the materializer that was used to write this artifact.

required
data_type str

The type of data that was passed to the materializer when writing that artifact. Will be used as a default type to read the artifact.

required
metadata_store BaseMetadataStore

The metadata store which should be used to fetch additional information related to this pipeline.

required
parent_step_id int

The ID of the parent step.

required
Source code in zenml/post_execution/artifact.py
def __init__(
    self,
    id_: int,
    type_: str,
    uri: str,
    materializer: str,
    data_type: str,
    metadata_store: "BaseMetadataStore",
    parent_step_id: int,
):
    """Initializes a post-execution artifact object.

    In most cases `ArtifactView` objects should not be created manually but
    retrieved from a `StepView` via the `inputs` or `outputs` properties.

    Args:
        id_: The artifact id.
        type_: The type of this artifact.
        uri: Specifies where the artifact data is stored.
        materializer: Information needed to restore the materializer
            that was used to write this artifact.
        data_type: The type of data that was passed to the materializer
            when writing that artifact. Will be used as a default type
            to read the artifact.
        metadata_store: The metadata store which should be used to fetch
            additional information related to this pipeline.
        parent_step_id: The ID of the parent step.
    """
    self._id = id_
    self._type = type_
    self._uri = uri
    self._materializer = materializer
    self._data_type = data_type
    self._metadata_store = metadata_store
    self._parent_step_id = parent_step_id
__repr__(self) special

Returns a string representation of this artifact.

Source code in zenml/post_execution/artifact.py
def __repr__(self) -> str:
    """Returns a string representation of this artifact."""
    return (
        f"{self.__class__.__qualname__}(id={self._id}, "
        f"type='{self._type}', uri='{self._uri}', "
        f"materializer='{self._materializer}')"
    )
read(self, output_data_type=None, materializer_class=None)

Materializes the data stored in this artifact.

Parameters:

Name Type Description Default
output_data_type Optional[Type[Any]]

The datatype to which the materializer should read, will be passed to the materializers handle_input method.

None
materializer_class Optional[Type[BaseMaterializer]]

The class of the materializer that should be used to read the artifact data. If no materializer class is given, we use the materializer that was used to write the artifact during execution of the pipeline.

None

Returns:

Type Description
Any

The materialized data.

Source code in zenml/post_execution/artifact.py
def read(
    self,
    output_data_type: Optional[Type[Any]] = None,
    materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
    """Materializes the data stored in this artifact.

    Args:
        output_data_type: The datatype to which the materializer should
            read, will be passed to the materializers `handle_input` method.
        materializer_class: The class of the materializer that should be
            used to read the artifact data. If no materializer class is
            given, we use the materializer that was used to write the
            artifact during execution of the pipeline.

    Returns:
          The materialized data.
    """

    if not materializer_class:
        try:
            materializer_class = source_utils.load_source_path_class(
                self._materializer
            )
        except (ModuleNotFoundError, AttributeError) as e:
            logger.error(
                f"ZenML can not locate and import the materializer module "
                f"{self._materializer} which was used to write this "
                f"artifact. If you want to read from it, please provide "
                f"a 'materializer_class'."
            )
            raise ModuleNotFoundError(e) from e

    if not output_data_type:
        try:
            output_data_type = source_utils.load_source_path_class(
                self._data_type
            )
        except (ModuleNotFoundError, AttributeError) as e:
            logger.error(
                f"ZenML can not locate and import the data type of this "
                f"artifact {self._data_type}. If you want to read "
                f"from it, please provide a 'output_data_type'."
            )
            raise ModuleNotFoundError(e) from e

    logger.debug(
        "Using '%s' to read '%s' (uri: %s).",
        materializer_class.__qualname__,
        self._type,
        self._uri,
    )

    # TODO [ENG-162]: passing in `self` to initialize the materializer only
    #  works because materializers only require a `.uri` property at the
    #  moment.
    materializer = materializer_class(self)  # type: ignore[arg-type]
    return materializer.handle_input(output_data_type)

pipeline

PipelineView

Post-execution pipeline class which can be used to query pipeline-related information from the metadata store.

Source code in zenml/post_execution/pipeline.py
class PipelineView:
    """Post-execution pipeline class which can be used to query
    pipeline-related information from the metadata store.
    """

    def __init__(
        self, id_: int, name: str, metadata_store: "BaseMetadataStore"
    ):
        """Initializes a post-execution pipeline object.

        In most cases `PipelineView` objects should not be created manually
        but retrieved using the `get_pipelines()` method of a
        `zenml.repository.Repository` instead.

        Args:
            id_: The context id of this pipeline.
            name: The name of this pipeline.
            metadata_store: The metadata store which should be used to fetch
                additional information related to this pipeline.
        """
        self._id = id_
        self._name = name
        self._metadata_store = metadata_store

    @property
    def name(self) -> str:
        """Returns the name of the pipeline."""
        return self._name

    @property
    def runs(self) -> List["PipelineRunView"]:
        """Returns all stored runs of this pipeline.

        The runs are returned in chronological order, so the latest
        run will be the last element in this list.
        """
        # Do not cache runs as new runs might appear during this objects
        # lifecycle
        runs = list(self._metadata_store.get_pipeline_runs(self).values())

        for run in runs:
            run._run_wrapper = self._get_zenstore_run(run_name=run.name)

        return runs

    def get_run_names(self) -> List[str]:
        """Returns a list of all run names."""
        # Do not cache runs as new runs might appear during this objects
        # lifecycle
        runs = self._metadata_store.get_pipeline_runs(self)
        return list(runs.keys())

    def get_run(self, name: str) -> "PipelineRunView":
        """Returns a run for the given name.

        Args:
            name: The name of the run to return.

        Raises:
            KeyError: If there is no run with the given name.
        """
        run = self._metadata_store.get_pipeline_run(self, name)

        if not run:
            raise KeyError(
                f"No run found for name `{name}`. This pipeline "
                f"only has runs with the following "
                f"names: `{self.get_run_names()}`"
            )

        run._run_wrapper = self._get_zenstore_run(run_name=name)
        return run

    def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
        """This method helps you find out which pipeline run produced
        the cached artifact of a given step.

        Args:
            step_name: Name of step at hand
        Return:
            None if no run is found that completed the given step,
             else the original pipeline_run
        """
        orig_pipeline_run = None

        for run in reversed(self.runs):
            try:
                step = run.get_step(step_name)
                if step.is_completed:
                    orig_pipeline_run = run
                    break
            except KeyError:
                pass
        if not orig_pipeline_run:
            raise LookupError(
                "No Pipeline Run could be found, that has"
                f" completed the provided step: [{step_name}]"
            )

        return orig_pipeline_run

    def _get_zenstore_run(self, run_name: str) -> Optional[PipelineRunWrapper]:
        """Gets a ZenStore run for the given run name.

        This will filter all ZenStore runs by the pipeline name of this
        pipeline view, the run name passed in as an argument and the metadata
        store that this pipeline run is associated with."""
        from zenml.repository import Repository

        repo = Repository(skip_repository_check=True)  # type: ignore[call-arg]
        try:
            run_wrapper = repo.zen_store.get_pipeline_run(
                pipeline_name=self.name, run_name=run_name
            )
            metadata_store_wrapper = run_wrapper.stack.get_component_wrapper(
                StackComponentType.METADATA_STORE
            )
            if metadata_store_wrapper and (
                metadata_store_wrapper.uuid == self._metadata_store.uuid
            ):
                return run_wrapper
        except KeyError:
            pass

        return None

    def __repr__(self) -> str:
        """Returns a string representation of this pipeline."""
        return (
            f"{self.__class__.__qualname__}(id={self._id}, "
            f"name='{self._name}')"
        )

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the
        same pipeline."""
        if isinstance(other, PipelineView):
            return (
                self._id == other._id
                and self._metadata_store.uuid == other._metadata_store.uuid
            )
        return NotImplemented
name: str property readonly

Returns the name of the pipeline.

runs: List[PipelineRunView] property readonly

Returns all stored runs of this pipeline.

The runs are returned in chronological order, so the latest run will be the last element in this list.

__eq__(self, other) special

Returns whether the other object is referring to the same pipeline.

Source code in zenml/post_execution/pipeline.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the
    same pipeline."""
    if isinstance(other, PipelineView):
        return (
            self._id == other._id
            and self._metadata_store.uuid == other._metadata_store.uuid
        )
    return NotImplemented
__init__(self, id_, name, metadata_store) special

Initializes a post-execution pipeline object.

In most cases PipelineView objects should not be created manually but retrieved using the get_pipelines() method of a zenml.repository.Repository instead.

Parameters:

Name Type Description Default
id_ int

The context id of this pipeline.

required
name str

The name of this pipeline.

required
metadata_store BaseMetadataStore

The metadata store which should be used to fetch additional information related to this pipeline.

required
Source code in zenml/post_execution/pipeline.py
def __init__(
    self, id_: int, name: str, metadata_store: "BaseMetadataStore"
):
    """Initializes a post-execution pipeline object.

    In most cases `PipelineView` objects should not be created manually
    but retrieved using the `get_pipelines()` method of a
    `zenml.repository.Repository` instead.

    Args:
        id_: The context id of this pipeline.
        name: The name of this pipeline.
        metadata_store: The metadata store which should be used to fetch
            additional information related to this pipeline.
    """
    self._id = id_
    self._name = name
    self._metadata_store = metadata_store
__repr__(self) special

Returns a string representation of this pipeline.

Source code in zenml/post_execution/pipeline.py
def __repr__(self) -> str:
    """Returns a string representation of this pipeline."""
    return (
        f"{self.__class__.__qualname__}(id={self._id}, "
        f"name='{self._name}')"
    )
get_run(self, name)

Returns a run for the given name.

Parameters:

Name Type Description Default
name str

The name of the run to return.

required

Exceptions:

Type Description
KeyError

If there is no run with the given name.

Source code in zenml/post_execution/pipeline.py
def get_run(self, name: str) -> "PipelineRunView":
    """Returns a run for the given name.

    Args:
        name: The name of the run to return.

    Raises:
        KeyError: If there is no run with the given name.
    """
    run = self._metadata_store.get_pipeline_run(self, name)

    if not run:
        raise KeyError(
            f"No run found for name `{name}`. This pipeline "
            f"only has runs with the following "
            f"names: `{self.get_run_names()}`"
        )

    run._run_wrapper = self._get_zenstore_run(run_name=name)
    return run
get_run_for_completed_step(self, step_name)

This method helps you find out which pipeline run produced the cached artifact of a given step.

Parameters:

Name Type Description Default
step_name str

Name of step at hand

required

Returns:

Type Description
PipelineRunView

None if no run is found that completed the given step, else the original pipeline_run

Source code in zenml/post_execution/pipeline.py
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
    """This method helps you find out which pipeline run produced
    the cached artifact of a given step.

    Args:
        step_name: Name of step at hand
    Return:
        None if no run is found that completed the given step,
         else the original pipeline_run
    """
    orig_pipeline_run = None

    for run in reversed(self.runs):
        try:
            step = run.get_step(step_name)
            if step.is_completed:
                orig_pipeline_run = run
                break
        except KeyError:
            pass
    if not orig_pipeline_run:
        raise LookupError(
            "No Pipeline Run could be found, that has"
            f" completed the provided step: [{step_name}]"
        )

    return orig_pipeline_run
get_run_names(self)

Returns a list of all run names.

Source code in zenml/post_execution/pipeline.py
def get_run_names(self) -> List[str]:
    """Returns a list of all run names."""
    # Do not cache runs as new runs might appear during this objects
    # lifecycle
    runs = self._metadata_store.get_pipeline_runs(self)
    return list(runs.keys())

pipeline_run

PipelineRunView

Post-execution pipeline run class which can be used to query steps and artifact information associated with a pipeline execution.

Source code in zenml/post_execution/pipeline_run.py
class PipelineRunView:
    """Post-execution pipeline run class which can be used to query
    steps and artifact information associated with a pipeline execution.
    """

    def __init__(
        self,
        id_: int,
        name: str,
        executions: List[proto.Execution],
        metadata_store: "BaseMetadataStore",
    ):
        """Initializes a post-execution pipeline run object.
        In most cases `PipelineRunView` objects should not be created manually
        but retrieved from a `PipelineView` object instead.
        Args:
            id_: The context id of this pipeline run.
            name: The name of this pipeline run.
            executions: All executions associated with this pipeline run.
            metadata_store: The metadata store which should be used to fetch
                additional information related to this pipeline run.
        """
        self._id = id_
        self._name = name
        self._metadata_store = metadata_store

        self._executions = executions
        self._steps: Dict[str, StepView] = OrderedDict()

        # This might be set from the parent pipeline view in case this run
        # is also tracked in the ZenStore
        self._run_wrapper: Optional[PipelineRunWrapper] = None

    @property
    def name(self) -> str:
        """Returns the name of the pipeline run."""
        return self._name

    @property
    def zenml_version(self) -> Optional[str]:
        """Version of ZenML that this pipeline run was performed with."""
        if self._run_wrapper:
            return self._run_wrapper.zenml_version
        return None

    @property
    def git_sha(self) -> Optional[str]:
        """Git commit SHA that this pipeline run was performed on.

        This will only be set if the pipeline code is in a git repository and
        there are no dirty files when running the pipeline."""
        if self._run_wrapper:
            return self._run_wrapper.git_sha
        return None

    @property
    def runtime_configuration(self) -> Optional["RuntimeConfiguration"]:
        """Runtime configuration that was used for this pipeline run.

        This will only be set if the pipeline run was tracked in a ZenStore.
        """
        if self._run_wrapper:
            return RuntimeConfiguration(
                **self._run_wrapper.runtime_configuration
            )

        return None

    @property
    def status(self) -> ExecutionStatus:
        """Returns the current status of the pipeline run."""
        step_statuses = (step.status for step in self.steps)

        if any(status == ExecutionStatus.FAILED for status in step_statuses):
            return ExecutionStatus.FAILED
        elif all(
            status == ExecutionStatus.COMPLETED
            or status == ExecutionStatus.CACHED
            for status in step_statuses
        ):
            return ExecutionStatus.COMPLETED
        else:
            return ExecutionStatus.RUNNING

    @property
    def steps(self) -> List[StepView]:
        """Returns all steps that were executed as part of this pipeline run."""
        self._ensure_steps_fetched()
        return list(self._steps.values())

    def get_step_names(self) -> List[str]:
        """Returns a list of all step names."""
        self._ensure_steps_fetched()
        return list(self._steps.keys())

    def get_step(self, name: str) -> StepView:
        """Returns a step for the given name.

        Args:
            name: The name of the step to return.

        Raises:
            KeyError: If there is no step with the given name.
        """
        self._ensure_steps_fetched()
        try:
            return self._steps[name]
        except KeyError:
            raise KeyError(
                f"No step found for name `{name}`. This pipeline "
                f"run only has steps with the following "
                f"names: `{self.get_step_names()}`"
            )

    def _ensure_steps_fetched(self) -> None:
        """Fetches all steps for this pipeline run from the metadata store."""
        if self._steps:
            # we already fetched the steps, no need to do anything
            return

        self._steps = self._metadata_store.get_pipeline_run_steps(self)

        if self._run_wrapper:
            # If we have the run wrapper from the ZenStore, pass on the step
            # wrapper so users can access additional information about the step.
            for step_wrapper in self._run_wrapper.pipeline.steps:
                if step_wrapper.name in self._steps:
                    self._steps[step_wrapper.name]._step_wrapper = step_wrapper

    def __repr__(self) -> str:
        """Returns a string representation of this pipeline run."""
        return (
            f"{self.__class__.__qualname__}(id={self._id}, "
            f"name='{self._name}')"
        )

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same
        pipeline run."""
        if isinstance(other, PipelineRunView):
            return (
                self._id == other._id
                and self._metadata_store.uuid == other._metadata_store.uuid
            )
        return NotImplemented
git_sha: Optional[str] property readonly

Git commit SHA that this pipeline run was performed on.

This will only be set if the pipeline code is in a git repository and there are no dirty files when running the pipeline.

name: str property readonly

Returns the name of the pipeline run.

runtime_configuration: Optional[RuntimeConfiguration] property readonly

Runtime configuration that was used for this pipeline run.

This will only be set if the pipeline run was tracked in a ZenStore.

status: ExecutionStatus property readonly

Returns the current status of the pipeline run.

steps: List[zenml.post_execution.step.StepView] property readonly

Returns all steps that were executed as part of this pipeline run.

zenml_version: Optional[str] property readonly

Version of ZenML that this pipeline run was performed with.

__eq__(self, other) special

Returns whether the other object is referring to the same pipeline run.

Source code in zenml/post_execution/pipeline_run.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same
    pipeline run."""
    if isinstance(other, PipelineRunView):
        return (
            self._id == other._id
            and self._metadata_store.uuid == other._metadata_store.uuid
        )
    return NotImplemented
__init__(self, id_, name, executions, metadata_store) special

Initializes a post-execution pipeline run object. In most cases PipelineRunView objects should not be created manually but retrieved from a PipelineView object instead.

Parameters:

Name Type Description Default
id_ int

The context id of this pipeline run.

required
name str

The name of this pipeline run.

required
executions List[ml_metadata.proto.metadata_store_pb2.Execution]

All executions associated with this pipeline run.

required
metadata_store BaseMetadataStore

The metadata store which should be used to fetch additional information related to this pipeline run.

required
Source code in zenml/post_execution/pipeline_run.py
def __init__(
    self,
    id_: int,
    name: str,
    executions: List[proto.Execution],
    metadata_store: "BaseMetadataStore",
):
    """Initializes a post-execution pipeline run object.
    In most cases `PipelineRunView` objects should not be created manually
    but retrieved from a `PipelineView` object instead.
    Args:
        id_: The context id of this pipeline run.
        name: The name of this pipeline run.
        executions: All executions associated with this pipeline run.
        metadata_store: The metadata store which should be used to fetch
            additional information related to this pipeline run.
    """
    self._id = id_
    self._name = name
    self._metadata_store = metadata_store

    self._executions = executions
    self._steps: Dict[str, StepView] = OrderedDict()

    # This might be set from the parent pipeline view in case this run
    # is also tracked in the ZenStore
    self._run_wrapper: Optional[PipelineRunWrapper] = None
__repr__(self) special

Returns a string representation of this pipeline run.

Source code in zenml/post_execution/pipeline_run.py
def __repr__(self) -> str:
    """Returns a string representation of this pipeline run."""
    return (
        f"{self.__class__.__qualname__}(id={self._id}, "
        f"name='{self._name}')"
    )
get_step(self, name)

Returns a step for the given name.

Parameters:

Name Type Description Default
name str

The name of the step to return.

required

Exceptions:

Type Description
KeyError

If there is no step with the given name.

Source code in zenml/post_execution/pipeline_run.py
def get_step(self, name: str) -> StepView:
    """Returns a step for the given name.

    Args:
        name: The name of the step to return.

    Raises:
        KeyError: If there is no step with the given name.
    """
    self._ensure_steps_fetched()
    try:
        return self._steps[name]
    except KeyError:
        raise KeyError(
            f"No step found for name `{name}`. This pipeline "
            f"run only has steps with the following "
            f"names: `{self.get_step_names()}`"
        )
get_step_names(self)

Returns a list of all step names.

Source code in zenml/post_execution/pipeline_run.py
def get_step_names(self) -> List[str]:
    """Returns a list of all step names."""
    self._ensure_steps_fetched()
    return list(self._steps.keys())

step

StepView

Post-execution step class which can be used to query artifact information associated with a pipeline step.

Source code in zenml/post_execution/step.py
class StepView:
    """Post-execution step class which can be used to query
    artifact information associated with a pipeline step.
    """

    def __init__(
        self,
        id_: int,
        parents_step_ids: List[int],
        entrypoint_name: str,
        name: str,
        parameters: Dict[str, Any],
        metadata_store: "BaseMetadataStore",
    ):
        """Initializes a post-execution step object.

        In most cases `StepView` objects should not be created manually
        but retrieved from a `PipelineRunView` object instead.

        Args:
            id_: The execution id of this step.
            parents_step_ids: The execution ids of the parents of this step.
            entrypoint_name: The name of this step.
            name: The name of this step within the pipeline
            parameters: Parameters that were used to run this step.
            metadata_store: The metadata store which should be used to fetch
                additional information related to this step.
        """
        self._id = id_
        self._parents_step_ids = parents_step_ids
        self._entrypoint_name = entrypoint_name
        self._name = name
        self._parameters = parameters
        self._metadata_store = metadata_store

        self._inputs: Dict[str, ArtifactView] = {}
        self._outputs: Dict[str, ArtifactView] = {}

        # This might be set from the parent pipeline run view in case the run
        # is also tracked in the ZenStore
        self._step_wrapper: Optional[StepWrapper] = None

    @property
    def id(self) -> int:
        """Returns the step id."""
        return self._id

    @property
    def parents_step_ids(self) -> List[int]:
        """Returns a list of ID's of all parents of this step."""
        return self._parents_step_ids

    @property
    def parent_steps(self) -> List["StepView"]:
        """Returns a list of all parent steps of this step."""
        steps = [
            self._metadata_store.get_step_by_id(s)
            for s in self.parents_step_ids
        ]
        return steps

    @property
    def entrypoint_name(self) -> str:
        """Returns the step entrypoint_name.

        This name is equal to the name argument passed to the @step decorator
        or the actual function name if no explicit name was given.

        Examples:
            # the step entrypoint_name will be "my_step"
            @step(name="my_step")
            def my_step_function(...)

            # the step entrypoint_name will be "my_step_function"
            @step
            def my_step_function(...)
        """
        return self._entrypoint_name

    @property
    def name(self) -> str:
        """Returns the name as it is defined in the pipeline.

        This name is equal to the name given to the step within the pipeline
        context

        Examples:
            @step()
            def my_step_function(...)

            @pipeline
            def my_pipeline_function(step_a)

            p = my_pipeline_function(
                    step_a = my_step_function()
                )

            The name will be `step_a`
        """
        return self._name

    @property
    def docstring(self) -> Optional[str]:
        """Docstring of the step function or class."""
        if self._step_wrapper:
            return self._step_wrapper.docstring
        return None

    @property
    def parameters(self) -> Dict[str, Any]:
        """The parameters used to run this step."""
        return self._parameters

    @property
    def status(self) -> ExecutionStatus:
        """Returns the current status of the step."""
        return self._metadata_store.get_step_status(self)

    @property
    def is_cached(self) -> bool:
        """Returns whether the step is cached or not."""
        return self.status == ExecutionStatus.CACHED

    @property
    def is_completed(self) -> bool:
        """Returns whether the step is cached or not."""
        return self.status == ExecutionStatus.COMPLETED

    @property
    def inputs(self) -> Dict[str, ArtifactView]:
        """Returns all input artifacts that were used to run this step."""
        self._ensure_inputs_outputs_fetched()
        return self._inputs

    @property
    def input(self) -> ArtifactView:
        """Returns the input artifact that was used to run this step.

        Raises:
            ValueError: If there were zero or multiple inputs to this step.
        """
        if len(self.inputs) != 1:
            raise ValueError(
                "Can't use the `StepView.input` property for steps with zero "
                "or multiple inputs, use `StepView.inputs` instead."
            )
        return next(iter(self.inputs.values()))

    @property
    def outputs(self) -> Dict[str, ArtifactView]:
        """Returns all output artifacts that were written by this step."""
        self._ensure_inputs_outputs_fetched()
        return self._outputs

    @property
    def output(self) -> ArtifactView:
        """Returns the output artifact that was written by this step.

        Raises:
            ValueError: If there were zero or multiple step outputs.
        """
        if len(self.outputs) != 1:
            raise ValueError(
                "Can't use the `StepView.output` property for steps with zero "
                "or multiple outputs, use `StepView.outputs` instead."
            )
        return next(iter(self.outputs.values()))

    def _ensure_inputs_outputs_fetched(self) -> None:
        """Fetches all step inputs and outputs from the metadata store."""
        if self._inputs or self._outputs:
            # we already fetched inputs/outputs, no need to do anything
            return

        self._inputs, self._outputs = self._metadata_store.get_step_artifacts(
            self
        )

    def __repr__(self) -> str:
        """Returns a string representation of this step."""
        return (
            f"{self.__class__.__qualname__}(id={self._id}, "
            f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
            f"parameters={self._parameters})"
        )

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same step."""
        if isinstance(other, StepView):
            return (
                self._id == other._id
                and self._metadata_store.uuid == other._metadata_store.uuid
            )
        return NotImplemented
docstring: Optional[str] property readonly

Docstring of the step function or class.

entrypoint_name: str property readonly

Returns the step entrypoint_name.

This name is equal to the name argument passed to the @step decorator or the actual function name if no explicit name was given.

Examples:

the step entrypoint_name will be "my_step"

@step(name="my_step") def my_step_function(...)

the step entrypoint_name will be "my_step_function"

@step def my_step_function(...)

id: int property readonly

Returns the step id.

input: ArtifactView property readonly

Returns the input artifact that was used to run this step.

Exceptions:

Type Description
ValueError

If there were zero or multiple inputs to this step.

inputs: Dict[str, zenml.post_execution.artifact.ArtifactView] property readonly

Returns all input artifacts that were used to run this step.

is_cached: bool property readonly

Returns whether the step is cached or not.

is_completed: bool property readonly

Returns whether the step is cached or not.

name: str property readonly

Returns the name as it is defined in the pipeline.

This name is equal to the name given to the step within the pipeline context

Examples:

@step() def my_step_function(...)

@pipeline def my_pipeline_function(step_a)

p = my_pipeline_function( step_a = my_step_function() )

The name will be step_a

output: ArtifactView property readonly

Returns the output artifact that was written by this step.

Exceptions:

Type Description
ValueError

If there were zero or multiple step outputs.

outputs: Dict[str, zenml.post_execution.artifact.ArtifactView] property readonly

Returns all output artifacts that were written by this step.

parameters: Dict[str, Any] property readonly

The parameters used to run this step.

parent_steps: List[StepView] property readonly

Returns a list of all parent steps of this step.

parents_step_ids: List[int] property readonly

Returns a list of ID's of all parents of this step.

status: ExecutionStatus property readonly

Returns the current status of the step.

__eq__(self, other) special

Returns whether the other object is referring to the same step.

Source code in zenml/post_execution/step.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same step."""
    if isinstance(other, StepView):
        return (
            self._id == other._id
            and self._metadata_store.uuid == other._metadata_store.uuid
        )
    return NotImplemented
__init__(self, id_, parents_step_ids, entrypoint_name, name, parameters, metadata_store) special

Initializes a post-execution step object.

In most cases StepView objects should not be created manually but retrieved from a PipelineRunView object instead.

Parameters:

Name Type Description Default
id_ int

The execution id of this step.

required
parents_step_ids List[int]

The execution ids of the parents of this step.

required
entrypoint_name str

The name of this step.

required
name str

The name of this step within the pipeline

required
parameters Dict[str, Any]

Parameters that were used to run this step.

required
metadata_store BaseMetadataStore

The metadata store which should be used to fetch additional information related to this step.

required
Source code in zenml/post_execution/step.py
def __init__(
    self,
    id_: int,
    parents_step_ids: List[int],
    entrypoint_name: str,
    name: str,
    parameters: Dict[str, Any],
    metadata_store: "BaseMetadataStore",
):
    """Initializes a post-execution step object.

    In most cases `StepView` objects should not be created manually
    but retrieved from a `PipelineRunView` object instead.

    Args:
        id_: The execution id of this step.
        parents_step_ids: The execution ids of the parents of this step.
        entrypoint_name: The name of this step.
        name: The name of this step within the pipeline
        parameters: Parameters that were used to run this step.
        metadata_store: The metadata store which should be used to fetch
            additional information related to this step.
    """
    self._id = id_
    self._parents_step_ids = parents_step_ids
    self._entrypoint_name = entrypoint_name
    self._name = name
    self._parameters = parameters
    self._metadata_store = metadata_store

    self._inputs: Dict[str, ArtifactView] = {}
    self._outputs: Dict[str, ArtifactView] = {}

    # This might be set from the parent pipeline run view in case the run
    # is also tracked in the ZenStore
    self._step_wrapper: Optional[StepWrapper] = None
__repr__(self) special

Returns a string representation of this step.

Source code in zenml/post_execution/step.py
def __repr__(self) -> str:
    """Returns a string representation of this step."""
    return (
        f"{self.__class__.__qualname__}(id={self._id}, "
        f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
        f"parameters={self._parameters})"
    )