Skip to content

Post Execution

zenml.post_execution special

Initialization for the post-execution module.

After executing a pipeline, the user needs to be able to fetch it from history and perform certain tasks. The post_execution submodule provides a set of interfaces with which the user can interact with artifacts, the pipeline, steps, and the post-run pipeline object.

artifact

Initialization for the post-execution artifact class.

ArtifactView (BaseView)

Post-execution artifact class.

This can be used to read artifact data that was created during a pipeline execution.

Source code in zenml/post_execution/artifact.py
class ArtifactView(BaseView):
    """Post-execution artifact class.

    This can be used to read artifact data that was created during a pipeline
    execution.
    """

    MODEL_CLASS = ArtifactResponseModel
    REPR_KEYS = ["id", "name", "uri"]

    @property
    def model(self) -> ArtifactResponseModel:
        """Returns the underlying `ArtifactResponseModel`.

        Returns:
            The underlying `ArtifactResponseModel`.
        """
        return cast(ArtifactResponseModel, self._model)

    def read(
        self,
        output_data_type: Optional[Type[Any]] = None,
        materializer_class: Optional[Type["BaseMaterializer"]] = None,
    ) -> Any:
        """Materializes the data stored in this artifact.

        Args:
            output_data_type: Deprecated; will be ignored.
            materializer_class: Deprecated; will be ignored.

        Returns:
            The materialized data.
        """
        if output_data_type is not None:
            logger.warning(
                "The `output_data_type` argument is deprecated and will be "
                "removed in a future release."
            )
        if materializer_class is not None:
            logger.warning(
                "The `materializer_class` argument is deprecated and will be "
                "removed in a future release."
            )

        from zenml.utils.materializer_utils import load_artifact

        return load_artifact(self.model)
model: ArtifactResponseModel property readonly

Returns the underlying ArtifactResponseModel.

Returns:

Type Description
ArtifactResponseModel

The underlying ArtifactResponseModel.

MODEL_CLASS (ArtifactBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for artifacts.

Source code in zenml/post_execution/artifact.py
class ArtifactResponseModel(ArtifactBaseModel, WorkspaceScopedResponseModel):
    """Response model for artifacts."""

    producer_step_run_id: Optional[UUID]
    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={}, title="Metadata of the artifact."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

read(self, output_data_type=None, materializer_class=None)

Materializes the data stored in this artifact.

Parameters:

Name Type Description Default
output_data_type Optional[Type[Any]]

Deprecated; will be ignored.

None
materializer_class Optional[Type[BaseMaterializer]]

Deprecated; will be ignored.

None

Returns:

Type Description
Any

The materialized data.

Source code in zenml/post_execution/artifact.py
def read(
    self,
    output_data_type: Optional[Type[Any]] = None,
    materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
    """Materializes the data stored in this artifact.

    Args:
        output_data_type: Deprecated; will be ignored.
        materializer_class: Deprecated; will be ignored.

    Returns:
        The materialized data.
    """
    if output_data_type is not None:
        logger.warning(
            "The `output_data_type` argument is deprecated and will be "
            "removed in a future release."
        )
    if materializer_class is not None:
        logger.warning(
            "The `materializer_class` argument is deprecated and will be "
            "removed in a future release."
        )

    from zenml.utils.materializer_utils import load_artifact

    return load_artifact(self.model)

base_view

Base classes for post-execution views.

BaseView (ABC)

Base class for post-execution views.

Subclasses should override the following attributes/methods: - MODEL_CLASS should be set to the model class the subclass is wrapping. - The model property should return self._model cast to the correct type. - (Optionally) REPR_KEYS can be set to include a list of custom attributes in the __repr__ method.

Source code in zenml/post_execution/base_view.py
class BaseView(ABC):
    """Base class for post-execution views.

    Subclasses should override the following attributes/methods:
    - `MODEL_CLASS` should be set to the model class the subclass is wrapping.
    - The `model` property should return `self._model` cast to the correct type.
    - (Optionally) `REPR_KEYS` can be set to include a list of custom attributes
        in the `__repr__` method.
    """

    MODEL_CLASS = BaseResponseModel  # The model class to wrap.
    REPR_KEYS = ["id"]  # Keys to include in the `__repr__` method.

    def __init__(self, model: BaseResponseModel):
        """Initializes a view for the given model.

        Args:
            model: The model to create a view for.

        Raises:
            TypeError: If the model is not of the correct type.
            ValueError: If any of the `REPR_KEYS` are not valid.
        """
        # Check that the model is of the correct type.
        if not isinstance(model, self.MODEL_CLASS):
            raise TypeError(
                f"Model of {self.__class__.__name__} must be of type "
                f"{self.MODEL_CLASS.__name__} but is {type(model).__name__}."
            )

        # Validate that all `REPR_KEYS` are valid.
        for key in self.REPR_KEYS:
            if (
                key not in model.__fields__
                and key not in self._custom_view_properties
            ):
                raise ValueError(
                    f"Key {key} in {self.__class__.__name__}.REPR_KEYS is "
                    f"neither a field of {self.MODEL_CLASS.__name__} nor a "
                    f"custom property of {self.__class__.__name__}."
                )

        self._model = model

    @property
    def _custom_view_properties(self) -> List[str]:
        """Returns a list of custom view properties.

        Returns:
            A list of custom view properties.
        """
        return [attr for attr in dir(self) if not attr.startswith("__")]

    @property
    @abstractmethod
    def model(self) -> BaseResponseModel:
        """Returns the underlying model.

        Subclasses should override this property to return `self._model` with
        the correct model type.

        E.g. `return cast(ArtifactResponseModel, self._model)`

        Returns:
            The underlying model.
        """

    def __getattribute__(self, __name: str) -> Any:
        """Returns the attribute with the given name.

        This method is overridden so we can access the model fields as if they
        were attributes of this class.

        Args:
            __name: The name of the attribute to return.

        Returns:
            The attribute with the given name.
        """
        # Handle special cases that are required by the `dir` call below
        if __name in {"__dict__", "__class__"}:
            return super().__getattribute__(__name)

        # Check the custom view properties first in case of overwrites
        if __name in {attr for attr in dir(self) if not attr.startswith("__")}:
            return super().__getattribute__(__name)

        # Then check if the attribute is a field in the model
        if __name in self._model.__fields__:
            return getattr(self._model, __name)

        # Otherwise, fall back to the default behavior
        return super().__getattribute__(__name)

    def __repr__(self) -> str:
        """Returns a string representation of this artifact.

        The string representation is of the form
        `__qualname__(<key1>=<value1>, <key2>=<value2>, ...)` where the keys
        are the ones specified in `REPR_KEYS`.

        Returns:
            A string representation of this artifact.
        """
        repr = self.__class__.__qualname__
        repr_key_strs = [
            f"{key}={getattr(self, key)}" for key in self.REPR_KEYS
        ]
        details = ", ".join(repr_key_strs)
        if details:
            repr += f"({details})"
        return repr

    def __eq__(self, other: Any) -> bool:
        """Returns whether the other object is referring to the same model.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is referring to the same model, else False.
        """
        if not isinstance(other, self.__class__):
            return False
        return self._model == other._model  # Use the model's `__eq__` method.
model: BaseResponseModel property readonly

Returns the underlying model.

Subclasses should override this property to return self._model with the correct model type.

E.g. return cast(ArtifactResponseModel, self._model)

Returns:

Type Description
BaseResponseModel

The underlying model.

MODEL_CLASS (BaseZenModel) pydantic-model

Base domain model.

Used as a base class for all domain models that have the following common characteristics:

  • are uniquely identified by a UUID
  • have a creation timestamp and a last modified timestamp
Source code in zenml/post_execution/base_view.py
class BaseResponseModel(BaseZenModel):
    """Base domain model.

    Used as a base class for all domain models that have the following common
    characteristics:

      * are uniquely identified by a UUID
      * have a creation timestamp and a last modified timestamp
    """

    id: UUID = Field(title="The unique resource id.")

    created: datetime = Field(title="Time when this resource was created.")
    updated: datetime = Field(
        title="Time when this resource was last updated."
    )

    def __hash__(self) -> int:
        """Implementation of hash magic method.

        Returns:
            Hash of the UUID.
        """
        return hash((type(self),) + tuple([self.id]))

    def __eq__(self, other: Any) -> bool:
        """Implementation of equality magic method.

        Args:
            other: The other object to compare to.

        Returns:
            True if the other object is of the same type and has the same UUID.
        """
        if isinstance(other, BaseResponseModel):
            return self.id == other.id
        else:
            return False

    def get_analytics_metadata(self) -> Dict[str, Any]:
        """Fetches the analytics metadata for base response models.

        Returns:
            The analytics metadata.
        """
        metadata = super().get_analytics_metadata()
        metadata["entity_id"] = self.id
        return metadata
__eq__(self, other) special

Implementation of equality magic method.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is of the same type and has the same UUID.

Source code in zenml/post_execution/base_view.py
def __eq__(self, other: Any) -> bool:
    """Implementation of equality magic method.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is of the same type and has the same UUID.
    """
    if isinstance(other, BaseResponseModel):
        return self.id == other.id
    else:
        return False
__hash__(self) special

Implementation of hash magic method.

Returns:

Type Description
int

Hash of the UUID.

Source code in zenml/post_execution/base_view.py
def __hash__(self) -> int:
    """Implementation of hash magic method.

    Returns:
        Hash of the UUID.
    """
    return hash((type(self),) + tuple([self.id]))
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_analytics_metadata(self)

Fetches the analytics metadata for base response models.

Returns:

Type Description
Dict[str, Any]

The analytics metadata.

Source code in zenml/post_execution/base_view.py
def get_analytics_metadata(self) -> Dict[str, Any]:
    """Fetches the analytics metadata for base response models.

    Returns:
        The analytics metadata.
    """
    metadata = super().get_analytics_metadata()
    metadata["entity_id"] = self.id
    return metadata
__eq__(self, other) special

Returns whether the other object is referring to the same model.

Parameters:

Name Type Description Default
other Any

The other object to compare to.

required

Returns:

Type Description
bool

True if the other object is referring to the same model, else False.

Source code in zenml/post_execution/base_view.py
def __eq__(self, other: Any) -> bool:
    """Returns whether the other object is referring to the same model.

    Args:
        other: The other object to compare to.

    Returns:
        True if the other object is referring to the same model, else False.
    """
    if not isinstance(other, self.__class__):
        return False
    return self._model == other._model  # Use the model's `__eq__` method.
__getattribute__(self, _BaseView__name) special

Returns the attribute with the given name.

This method is overridden so we can access the model fields as if they were attributes of this class.

Parameters:

Name Type Description Default
__name

The name of the attribute to return.

required

Returns:

Type Description
Any

The attribute with the given name.

Source code in zenml/post_execution/base_view.py
def __getattribute__(self, __name: str) -> Any:
    """Returns the attribute with the given name.

    This method is overridden so we can access the model fields as if they
    were attributes of this class.

    Args:
        __name: The name of the attribute to return.

    Returns:
        The attribute with the given name.
    """
    # Handle special cases that are required by the `dir` call below
    if __name in {"__dict__", "__class__"}:
        return super().__getattribute__(__name)

    # Check the custom view properties first in case of overwrites
    if __name in {attr for attr in dir(self) if not attr.startswith("__")}:
        return super().__getattribute__(__name)

    # Then check if the attribute is a field in the model
    if __name in self._model.__fields__:
        return getattr(self._model, __name)

    # Otherwise, fall back to the default behavior
    return super().__getattribute__(__name)
__init__(self, model) special

Initializes a view for the given model.

Parameters:

Name Type Description Default
model BaseResponseModel

The model to create a view for.

required

Exceptions:

Type Description
TypeError

If the model is not of the correct type.

ValueError

If any of the REPR_KEYS are not valid.

Source code in zenml/post_execution/base_view.py
def __init__(self, model: BaseResponseModel):
    """Initializes a view for the given model.

    Args:
        model: The model to create a view for.

    Raises:
        TypeError: If the model is not of the correct type.
        ValueError: If any of the `REPR_KEYS` are not valid.
    """
    # Check that the model is of the correct type.
    if not isinstance(model, self.MODEL_CLASS):
        raise TypeError(
            f"Model of {self.__class__.__name__} must be of type "
            f"{self.MODEL_CLASS.__name__} but is {type(model).__name__}."
        )

    # Validate that all `REPR_KEYS` are valid.
    for key in self.REPR_KEYS:
        if (
            key not in model.__fields__
            and key not in self._custom_view_properties
        ):
            raise ValueError(
                f"Key {key} in {self.__class__.__name__}.REPR_KEYS is "
                f"neither a field of {self.MODEL_CLASS.__name__} nor a "
                f"custom property of {self.__class__.__name__}."
            )

    self._model = model
__repr__(self) special

Returns a string representation of this artifact.

The string representation is of the form __qualname__(<key1>=<value1>, <key2>=<value2>, ...) where the keys are the ones specified in REPR_KEYS.

Returns:

Type Description
str

A string representation of this artifact.

Source code in zenml/post_execution/base_view.py
def __repr__(self) -> str:
    """Returns a string representation of this artifact.

    The string representation is of the form
    `__qualname__(<key1>=<value1>, <key2>=<value2>, ...)` where the keys
    are the ones specified in `REPR_KEYS`.

    Returns:
        A string representation of this artifact.
    """
    repr = self.__class__.__qualname__
    repr_key_strs = [
        f"{key}={getattr(self, key)}" for key in self.REPR_KEYS
    ]
    details = ", ".join(repr_key_strs)
    if details:
        repr += f"({details})"
    return repr

lineage special

Initialization of lineage generation module.

edge

Class for Edges in a lineage graph.

Edge (BaseModel) pydantic-model

A class that represents an edge in a lineage graph.

Source code in zenml/post_execution/lineage/edge.py
class Edge(BaseModel):
    """A class that represents an edge in a lineage graph."""

    id: str
    source: str
    target: str

lineage_graph

Class for lineage graph generation.

LineageGraph (BaseModel) pydantic-model

A lineage graph representation of a PipelineRunView.

Source code in zenml/post_execution/lineage/lineage_graph.py
class LineageGraph(BaseModel):
    """A lineage graph representation of a PipelineRunView."""

    nodes: List[Union[StepNode, ArtifactNode]] = []
    edges: List[Edge] = []
    root_step_id: Optional[str]
    run_metadata: List[Tuple[str, str, str]] = []

    def generate_step_nodes_and_edges(self, step: StepView) -> None:
        """Generates the step nodes and the edges between them.

        Args:
            step: The step to generate the nodes and edges for.
        """
        step_id = STEP_PREFIX + str(step.id)
        if self.root_step_id is None:
            self.root_step_id = step_id
        step_config = step.step_configuration.dict()
        if step_config:
            step_config = {
                key: value
                for key, value in step_config.items()
                if key not in ["inputs", "outputs", "parameters"] and value
            }
        self.nodes.append(
            StepNode(
                id=step_id,
                data=StepNodeDetails(
                    execution_id=str(step.id),
                    name=step.name,  # redundant for consistency
                    status=step.status,
                    entrypoint_name=step.entrypoint_name,  # redundant for consistency
                    parameters=step.parameters,
                    configuration=step_config,
                    inputs={k: v.uri for k, v in step.inputs.items()},
                    outputs={k: v.uri for k, v in step.outputs.items()},
                    metadata=[
                        (m.key, str(m.value), str(m.type))
                        for m in step.metadata.values()
                    ],
                ),
            )
        )

        for artifact_name, artifact in step.outputs.items():
            artifact_id = ARTIFACT_PREFIX + str(artifact.id)
            self.nodes.append(
                ArtifactNode(
                    id=artifact_id,
                    data=ArtifactNodeDetails(
                        execution_id=str(artifact.id),
                        name=artifact_name,
                        status=step.status,
                        is_cached=step.status == ExecutionStatus.CACHED,
                        artifact_type=artifact.type,
                        artifact_data_type=artifact.data_type,
                        parent_step_id=str(step.id),
                        producer_step_id=str(artifact.producer_step_run_id),
                        uri=artifact.uri,
                        metadata=[
                            (m.key, str(m.value), str(m.type))
                            for m in artifact.metadata.values()
                        ],
                    ),
                )
            )
            self.edges.append(
                Edge(
                    id=step_id + "_" + artifact_id,
                    source=step_id,
                    target=artifact_id,
                )
            )

        for artifact_name, artifact in step.inputs.items():
            artifact_id = ARTIFACT_PREFIX + str(artifact.id)
            self.edges.append(
                Edge(
                    id=step_id + "_" + artifact_id,
                    source=artifact_id,
                    target=step_id,
                )
            )

    def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
        """Generates the run nodes and the edges between them.

        Args:
            run: The PipelineRunView to generate the lineage graph for.
        """
        self.run_metadata = [
            (m.key, str(m.value), str(m.type)) for m in run.metadata.values()
        ]
        for step in run.steps:
            self.generate_step_nodes_and_edges(step)
generate_run_nodes_and_edges(self, run)

Generates the run nodes and the edges between them.

Parameters:

Name Type Description Default
run PipelineRunView

The PipelineRunView to generate the lineage graph for.

required
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_run_nodes_and_edges(self, run: PipelineRunView) -> None:
    """Generates the run nodes and the edges between them.

    Args:
        run: The PipelineRunView to generate the lineage graph for.
    """
    self.run_metadata = [
        (m.key, str(m.value), str(m.type)) for m in run.metadata.values()
    ]
    for step in run.steps:
        self.generate_step_nodes_and_edges(step)
generate_step_nodes_and_edges(self, step)

Generates the step nodes and the edges between them.

Parameters:

Name Type Description Default
step StepView

The step to generate the nodes and edges for.

required
Source code in zenml/post_execution/lineage/lineage_graph.py
def generate_step_nodes_and_edges(self, step: StepView) -> None:
    """Generates the step nodes and the edges between them.

    Args:
        step: The step to generate the nodes and edges for.
    """
    step_id = STEP_PREFIX + str(step.id)
    if self.root_step_id is None:
        self.root_step_id = step_id
    step_config = step.step_configuration.dict()
    if step_config:
        step_config = {
            key: value
            for key, value in step_config.items()
            if key not in ["inputs", "outputs", "parameters"] and value
        }
    self.nodes.append(
        StepNode(
            id=step_id,
            data=StepNodeDetails(
                execution_id=str(step.id),
                name=step.name,  # redundant for consistency
                status=step.status,
                entrypoint_name=step.entrypoint_name,  # redundant for consistency
                parameters=step.parameters,
                configuration=step_config,
                inputs={k: v.uri for k, v in step.inputs.items()},
                outputs={k: v.uri for k, v in step.outputs.items()},
                metadata=[
                    (m.key, str(m.value), str(m.type))
                    for m in step.metadata.values()
                ],
            ),
        )
    )

    for artifact_name, artifact in step.outputs.items():
        artifact_id = ARTIFACT_PREFIX + str(artifact.id)
        self.nodes.append(
            ArtifactNode(
                id=artifact_id,
                data=ArtifactNodeDetails(
                    execution_id=str(artifact.id),
                    name=artifact_name,
                    status=step.status,
                    is_cached=step.status == ExecutionStatus.CACHED,
                    artifact_type=artifact.type,
                    artifact_data_type=artifact.data_type,
                    parent_step_id=str(step.id),
                    producer_step_id=str(artifact.producer_step_run_id),
                    uri=artifact.uri,
                    metadata=[
                        (m.key, str(m.value), str(m.type))
                        for m in artifact.metadata.values()
                    ],
                ),
            )
        )
        self.edges.append(
            Edge(
                id=step_id + "_" + artifact_id,
                source=step_id,
                target=artifact_id,
            )
        )

    for artifact_name, artifact in step.inputs.items():
        artifact_id = ARTIFACT_PREFIX + str(artifact.id)
        self.edges.append(
            Edge(
                id=step_id + "_" + artifact_id,
                source=artifact_id,
                target=step_id,
            )
        )

node special

Initialization of lineage nodes.

artifact_node

Class for all lineage artifact nodes.

ArtifactNode (BaseNode) pydantic-model

A class that represents an artifact node in a lineage graph.

Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNode(BaseNode):
    """A class that represents an artifact node in a lineage graph."""

    type: str = "artifact"
    data: ArtifactNodeDetails
ArtifactNodeDetails (BaseNodeDetails) pydantic-model

Captures all artifact details for the node.

Source code in zenml/post_execution/lineage/node/artifact_node.py
class ArtifactNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    is_cached: bool
    artifact_type: str
    artifact_data_type: str
    parent_step_id: str
    producer_step_id: Optional[str]
    uri: str
    metadata: List[Tuple[str, str, str]]  # (key, value, type)
base_node

Base class for all lineage nodes.

BaseNode (BaseModel) pydantic-model

A class that represents a node in a lineage graph.

Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNode(BaseModel):
    """A class that represents a node in a lineage graph."""

    id: str
    type: str
    data: BaseNodeDetails
BaseNodeDetails (BaseModel) pydantic-model

Captures all details for the node.

Source code in zenml/post_execution/lineage/node/base_node.py
class BaseNodeDetails(BaseModel):
    """Captures all details for the node."""

    execution_id: str
    name: str
    status: ExecutionStatus
step_node

Class for all lineage step nodes.

StepNode (BaseNode) pydantic-model

A class that represents a step node in a lineage graph.

Source code in zenml/post_execution/lineage/node/step_node.py
class StepNode(BaseNode):
    """A class that represents a step node in a lineage graph."""

    type: str = "step"
    data: StepNodeDetails
StepNodeDetails (BaseNodeDetails) pydantic-model

Captures all artifact details for the node.

Source code in zenml/post_execution/lineage/node/step_node.py
class StepNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    entrypoint_name: str
    parameters: Dict[str, Any]
    configuration: Dict[str, Any]
    inputs: Dict[str, Any]
    outputs: Dict[str, Any]
    metadata: List[Tuple[str, str, str]]  # (key, value, type)

pipeline

Implementation of the post-execution pipeline.

PipelineView (BaseView)

Post-execution pipeline class.

Source code in zenml/post_execution/pipeline.py
class PipelineView(BaseView):
    """Post-execution pipeline class."""

    MODEL_CLASS = PipelineResponseModel
    REPR_KEYS = ["id", "name"]

    @property
    def model(self) -> PipelineResponseModel:
        """Returns the underlying `PipelineResponseModel`.

        Returns:
            The underlying `PipelineResponseModel`.
        """
        return cast(PipelineResponseModel, self._model)

    @property
    def num_runs(self) -> int:
        """Returns the number of runs of this pipeline.

        Returns:
            The number of runs of this pipeline.
        """
        active_workspace_id = Client().active_workspace.id
        return (
            Client()
            .zen_store.list_runs(
                PipelineRunFilterModel(
                    workspace_id=active_workspace_id,
                    pipeline_id=self._model.id,
                )
            )
            .total
        )

    @property
    def runs(self) -> List["PipelineRunView"]:
        """Returns the last 50 stored runs of this pipeline.

        The runs are returned in reverse chronological order, so the latest
        run will be the first element in this list.

        Returns:
            A list of all stored runs of this pipeline.
        """
        # Do not cache runs as new runs might appear during this objects
        # lifecycle
        active_workspace_id = Client().active_workspace.id
        runs = Client().list_runs(
            workspace_id=active_workspace_id,
            pipeline_id=self.model.id,
            size=50,
            sort_by="desc:created",
        )

        return [PipelineRunView(run) for run in runs.items]

    def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
        """Ascertains which pipeline run produced the cached artifact of a given step.

        Args:
            step_name: Name of step at hand

        Returns:
            None if no run is found that completed the given step,
                else the original pipeline_run.

        Raises:
            LookupError: If no run is found that completed the given step
        """
        orig_pipeline_run = None

        for run in reversed(self.runs):
            try:
                step = run.get_step(step_name)
                if step.is_completed:
                    orig_pipeline_run = run
                    break
            except KeyError:
                pass
        if not orig_pipeline_run:
            raise LookupError(
                "No Pipeline Run could be found, that has"
                f" completed the provided step: [{step_name}]"
            )

        return orig_pipeline_run
model: PipelineResponseModel property readonly

Returns the underlying PipelineResponseModel.

Returns:

Type Description
PipelineResponseModel

The underlying PipelineResponseModel.

num_runs: int property readonly

Returns the number of runs of this pipeline.

Returns:

Type Description
int

The number of runs of this pipeline.

runs: List[PipelineRunView] property readonly

Returns the last 50 stored runs of this pipeline.

The runs are returned in reverse chronological order, so the latest run will be the first element in this list.

Returns:

Type Description
List[PipelineRunView]

A list of all stored runs of this pipeline.

MODEL_CLASS (PipelineBaseModel, WorkspaceScopedResponseModel) pydantic-model

Pipeline response model user, workspace, runs, and status hydrated.

Source code in zenml/post_execution/pipeline.py
class PipelineResponseModel(PipelineBaseModel, WorkspaceScopedResponseModel):
    """Pipeline response model user, workspace, runs, and status hydrated."""

    runs: Optional[List["PipelineRunResponseModel"]] = Field(
        title="A list of the last x Pipeline Runs."
    )
    status: Optional[List[ExecutionStatus]] = Field(
        title="The status of the last x Pipeline Runs."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

get_run_for_completed_step(self, step_name)

Ascertains which pipeline run produced the cached artifact of a given step.

Parameters:

Name Type Description Default
step_name str

Name of step at hand

required

Returns:

Type Description
PipelineRunView

None if no run is found that completed the given step, else the original pipeline_run.

Exceptions:

Type Description
LookupError

If no run is found that completed the given step

Source code in zenml/post_execution/pipeline.py
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
    """Ascertains which pipeline run produced the cached artifact of a given step.

    Args:
        step_name: Name of step at hand

    Returns:
        None if no run is found that completed the given step,
            else the original pipeline_run.

    Raises:
        LookupError: If no run is found that completed the given step
    """
    orig_pipeline_run = None

    for run in reversed(self.runs):
        try:
            step = run.get_step(step_name)
            if step.is_completed:
                orig_pipeline_run = run
                break
        except KeyError:
            pass
    if not orig_pipeline_run:
        raise LookupError(
            "No Pipeline Run could be found, that has"
            f" completed the provided step: [{step_name}]"
        )

    return orig_pipeline_run

pipeline_run

Implementation of the post-execution pipeline run class.

PipelineRunView (BaseView)

Post-execution pipeline run class.

This can be used to query steps and artifact information associated with a pipeline execution.

Source code in zenml/post_execution/pipeline_run.py
class PipelineRunView(BaseView):
    """Post-execution pipeline run class.

    This can be used to query steps and artifact information associated with a
    pipeline execution.
    """

    MODEL_CLASS = PipelineRunResponseModel
    REPR_KEYS = ["id", "name"]

    def __init__(self, model: PipelineRunResponseModel):
        """Initializes a post-execution pipeline run object.

        In most cases `PipelineRunView` objects should not be created manually
        but retrieved from a `PipelineView` object instead.

        Args:
            model: The model to initialize this object from.
        """
        super().__init__(model)
        self._steps: Dict[str, StepView] = OrderedDict()

    @property
    def model(self) -> PipelineRunResponseModel:
        """Returns the underlying `PipelineRunResponseModel`.

        Returns:
            The underlying `PipelineRunResponseModel`.
        """
        return cast(PipelineRunResponseModel, self._model)

    @property
    def settings(self) -> Dict[str, Any]:
        """Returns the pipeline settings.

        These are runtime settings passed down to stack components, which
        can be set at pipeline level.

        Returns:
            The pipeline settings.
        """
        settings = self.model.pipeline_configuration["settings"]
        return cast(Dict[str, Any], settings)

    @property
    def extra(self) -> Dict[str, Any]:
        """Returns the pipeline extras.

        This dict is meant to be used to pass any configuration down to the
        pipeline or stack components that the user has use of.

        Returns:
            The pipeline extras.
        """
        extra = self.model.pipeline_configuration["extra"]
        return cast(Dict[str, Any], extra)

    @property
    def enable_cache(self) -> Optional[bool]:
        """Returns whether caching is enabled for this pipeline run.

        Returns:
            True if caching is enabled for this pipeline run.
        """
        from zenml.pipelines.base_pipeline import PARAM_ENABLE_CACHE

        return self.model.pipeline_configuration.get(PARAM_ENABLE_CACHE)

    @property
    def enable_artifact_metadata(self) -> Optional[bool]:
        """Returns whether artifact metadata is enabled for this pipeline run.

        Returns:
            True if artifact metadata is enabled for this pipeline run.
        """
        from zenml.pipelines.base_pipeline import (
            PARAM_ENABLE_ARTIFACT_METADATA,
        )

        return self.model.pipeline_configuration.get(
            PARAM_ENABLE_ARTIFACT_METADATA
        )

    @property
    def status(self) -> ExecutionStatus:
        """Returns the current status of the pipeline run.

        Returns:
            The current status of the pipeline run.
        """
        # Query the run again since the status might have changed since this
        # object was created.
        return Client().get_pipeline_run(self.model.id).status

    @property
    def steps(self) -> List[StepView]:
        """Returns all steps that were executed as part of this pipeline run.

        Returns:
            A list of all steps that were executed as part of this pipeline run.
        """
        self._ensure_steps_fetched()
        return list(self._steps.values())

    def get_step_names(self) -> List[str]:
        """Returns a list of all step names.

        Returns:
            A list of all step names.
        """
        self._ensure_steps_fetched()
        return list(self._steps.keys())

    def get_step(
        self,
        step: Optional[str] = None,
        **kwargs: Any,
    ) -> StepView:
        """Returns a step for the given name.

        The name refers to the name of the step in the pipeline definition, not
        the class name of the step-class.

        Use it like this:
        ```python
        # Get the step by name
        pipeline_run_view.get_step("first_step")
        ```

        Args:
            step: Class or class instance of the step
            **kwargs: The deprecated `name` is caught as a kwarg to
                specify the step instead of using the `step` argument.

        Returns:
            A step for the given name.

        Raises:
            KeyError: If there is no step with the given name.
            RuntimeError: If no step has been specified at all.
        """
        self._ensure_steps_fetched()

        api_doc_link = get_apidocs_link(
            "core-post_execution",
            "zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
        )
        step_name = kwargs.get("name", None)

        # Raise an error if neither `step` nor `name` args were provided.
        if not step and not isinstance(step_name, str):
            raise RuntimeError(
                "No step specified. Please specify a step using "
                "pipeline_run_view.get_step(step=`step_name`). "
                f"Please refer to the API docs to learn more: "
                f"{api_doc_link}"
            )

        # If `name` was provided but not `step`, print a depreciation warning.
        if not step:
            logger.warning(
                "Using 'name' to get a step from "
                "'PipelineRunView.get_step()' is deprecated and "
                "will be removed in the future. Instead please "
                "use 'step' to access a step from your past "
                "pipeline runs. Learn more in our API docs: %s",
                api_doc_link,
            )
            step = step_name

        # Raise an error if there is no such step in the given pipeline run.
        if step not in self._steps:
            raise KeyError(
                f"No step found for name `{step}`. This pipeline "
                f"run only has steps with the following "
                f"names: `{self.get_step_names()}`"
            )

        return self._steps[step]

    def _ensure_steps_fetched(self) -> None:
        """Fetches all steps for this pipeline run from the metadata store."""
        if self._steps:
            # we already fetched the steps, no need to do anything
            return

        client = Client()
        steps = depaginate(
            partial(client.list_run_steps, pipeline_run_id=self.model.id)
        )

        self._steps = {step.name: StepView(step) for step in steps}
enable_artifact_metadata: Optional[bool] property readonly

Returns whether artifact metadata is enabled for this pipeline run.

Returns:

Type Description
Optional[bool]

True if artifact metadata is enabled for this pipeline run.

enable_cache: Optional[bool] property readonly

Returns whether caching is enabled for this pipeline run.

Returns:

Type Description
Optional[bool]

True if caching is enabled for this pipeline run.

extra: Dict[str, Any] property readonly

Returns the pipeline extras.

This dict is meant to be used to pass any configuration down to the pipeline or stack components that the user has use of.

Returns:

Type Description
Dict[str, Any]

The pipeline extras.

model: PipelineRunResponseModel property readonly

Returns the underlying PipelineRunResponseModel.

Returns:

Type Description
PipelineRunResponseModel

The underlying PipelineRunResponseModel.

settings: Dict[str, Any] property readonly

Returns the pipeline settings.

These are runtime settings passed down to stack components, which can be set at pipeline level.

Returns:

Type Description
Dict[str, Any]

The pipeline settings.

status: ExecutionStatus property readonly

Returns the current status of the pipeline run.

Returns:

Type Description
ExecutionStatus

The current status of the pipeline run.

steps: List[zenml.post_execution.step.StepView] property readonly

Returns all steps that were executed as part of this pipeline run.

Returns:

Type Description
List[zenml.post_execution.step.StepView]

A list of all steps that were executed as part of this pipeline run.

MODEL_CLASS (PipelineRunBaseModel, WorkspaceScopedResponseModel) pydantic-model

Pipeline run model with user, workspace, pipeline, and stack hydrated.

Source code in zenml/post_execution/pipeline_run.py
class PipelineRunResponseModel(
    PipelineRunBaseModel, WorkspaceScopedResponseModel
):
    """Pipeline run model with user, workspace, pipeline, and stack hydrated."""

    pipeline: Optional["PipelineResponseModel"] = Field(
        title="The pipeline this run belongs to."
    )
    stack: Optional["StackResponseModel"] = Field(
        title="The stack that was used for this run."
    )

    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={},
        title="Metadata associated with this pipeline run.",
    )

    build: Optional["PipelineBuildResponseModel"] = Field(
        title="The pipeline build that was used for this run."
    )

    deployment: Optional["PipelineDeploymentResponseModel"] = Field(
        title="The deployment that was used for this run."
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

__init__(self, model) special

Initializes a post-execution pipeline run object.

In most cases PipelineRunView objects should not be created manually but retrieved from a PipelineView object instead.

Parameters:

Name Type Description Default
model PipelineRunResponseModel

The model to initialize this object from.

required
Source code in zenml/post_execution/pipeline_run.py
def __init__(self, model: PipelineRunResponseModel):
    """Initializes a post-execution pipeline run object.

    In most cases `PipelineRunView` objects should not be created manually
    but retrieved from a `PipelineView` object instead.

    Args:
        model: The model to initialize this object from.
    """
    super().__init__(model)
    self._steps: Dict[str, StepView] = OrderedDict()
get_step(self, step=None, **kwargs)

Returns a step for the given name.

The name refers to the name of the step in the pipeline definition, not the class name of the step-class.

Use it like this:

# Get the step by name
pipeline_run_view.get_step("first_step")

Parameters:

Name Type Description Default
step Optional[str]

Class or class instance of the step

None
**kwargs Any

The deprecated name is caught as a kwarg to specify the step instead of using the step argument.

{}

Returns:

Type Description
StepView

A step for the given name.

Exceptions:

Type Description
KeyError

If there is no step with the given name.

RuntimeError

If no step has been specified at all.

Source code in zenml/post_execution/pipeline_run.py
def get_step(
    self,
    step: Optional[str] = None,
    **kwargs: Any,
) -> StepView:
    """Returns a step for the given name.

    The name refers to the name of the step in the pipeline definition, not
    the class name of the step-class.

    Use it like this:
    ```python
    # Get the step by name
    pipeline_run_view.get_step("first_step")
    ```

    Args:
        step: Class or class instance of the step
        **kwargs: The deprecated `name` is caught as a kwarg to
            specify the step instead of using the `step` argument.

    Returns:
        A step for the given name.

    Raises:
        KeyError: If there is no step with the given name.
        RuntimeError: If no step has been specified at all.
    """
    self._ensure_steps_fetched()

    api_doc_link = get_apidocs_link(
        "core-post_execution",
        "zenml.post_execution.pipeline_run.PipelineRunView" ".get_step",
    )
    step_name = kwargs.get("name", None)

    # Raise an error if neither `step` nor `name` args were provided.
    if not step and not isinstance(step_name, str):
        raise RuntimeError(
            "No step specified. Please specify a step using "
            "pipeline_run_view.get_step(step=`step_name`). "
            f"Please refer to the API docs to learn more: "
            f"{api_doc_link}"
        )

    # If `name` was provided but not `step`, print a depreciation warning.
    if not step:
        logger.warning(
            "Using 'name' to get a step from "
            "'PipelineRunView.get_step()' is deprecated and "
            "will be removed in the future. Instead please "
            "use 'step' to access a step from your past "
            "pipeline runs. Learn more in our API docs: %s",
            api_doc_link,
        )
        step = step_name

    # Raise an error if there is no such step in the given pipeline run.
    if step not in self._steps:
        raise KeyError(
            f"No step found for name `{step}`. This pipeline "
            f"run only has steps with the following "
            f"names: `{self.get_step_names()}`"
        )

    return self._steps[step]
get_step_names(self)

Returns a list of all step names.

Returns:

Type Description
List[str]

A list of all step names.

Source code in zenml/post_execution/pipeline_run.py
def get_step_names(self) -> List[str]:
    """Returns a list of all step names.

    Returns:
        A list of all step names.
    """
    self._ensure_steps_fetched()
    return list(self._steps.keys())

get_run(name)

Fetches the post-execution view of a run with the given name.

Parameters:

Name Type Description Default
name str

The name of the run to fetch.

required

Returns:

Type Description
PipelineRunView

The post-execution view of the run with the given name.

Exceptions:

Type Description
KeyError

If no run with the given name exists.

RuntimeError

If multiple runs with the given name exist.

Source code in zenml/post_execution/pipeline_run.py
def get_run(name: str) -> "PipelineRunView":
    """Fetches the post-execution view of a run with the given name.

    Args:
        name: The name of the run to fetch.

    Returns:
        The post-execution view of the run with the given name.

    Raises:
        KeyError: If no run with the given name exists.
        RuntimeError: If multiple runs with the given name exist.
    """
    client = Client()
    active_workspace_id = client.active_workspace.id
    runs = client.list_runs(
        name=name,
        workspace_id=active_workspace_id,
    )

    # TODO: [server] this error handling could be improved
    if not runs:
        raise KeyError(f"No run with name '{name}' exists.")
    elif runs.total > 1:
        raise RuntimeError(
            f"Multiple runs have been found for name  '{name}'.", runs
        )
    return PipelineRunView(runs.items[0])

get_unlisted_runs()

Fetches the post-execution views of the 50 most recent unlisted runs.

Unlisted runs are runs that are not associated with any pipeline.

Returns:

Type Description
List[PipelineRunView]

A list of post-execution run views.

Source code in zenml/post_execution/pipeline_run.py
def get_unlisted_runs() -> List["PipelineRunView"]:
    """Fetches the post-execution views of the 50 most recent unlisted runs.

    Unlisted runs are runs that are not associated with any pipeline.

    Returns:
        A list of post-execution run views.
    """
    client = Client()
    runs = client.list_runs(
        workspace_id=client.active_workspace.id,
        unlisted=True,
        size=50,
        sort_by="desc:created",
    )
    return [PipelineRunView(model) for model in runs.items]

step

Implementation of a post-execution step class.

StepView (BaseView)

Post-execution step class.

This can be used to query artifact information associated with a pipeline step.

Source code in zenml/post_execution/step.py
class StepView(BaseView):
    """Post-execution step class.

    This can be used to query artifact information associated with a pipeline step.
    """

    MODEL_CLASS = StepRunResponseModel
    REPR_KEYS = ["id", "name", "entrypoint_name"]

    def __init__(self, model: StepRunResponseModel):
        """Initializes a post-execution step object.

        In most cases `StepView` objects should not be created manually
        but retrieved from a `PipelineRunView` object instead.

        Args:
            model: The model to initialize this object from.
        """
        super().__init__(model)
        self._inputs: Dict[str, ArtifactView] = {}
        self._outputs: Dict[str, ArtifactView] = {}

    @property
    def model(self) -> StepRunResponseModel:
        """Returns the underlying `StepRunResponseModel`.

        Returns:
            The underlying `StepRunResponseModel`.
        """
        return cast(StepRunResponseModel, self._model)

    @property
    def step_configuration(self) -> "StepConfiguration":
        """Returns the step configuration.

        Returns:
            The step configuration.
        """
        return self.model.step.config

    @property
    def entrypoint_name(self) -> str:
        """Returns the step entrypoint_name.

        This name is equal to the name argument passed to the @step decorator
        or the actual function name if no explicit name was given.

        Examples:
            # the step entrypoint_name will be "my_step"
            @step(step="my_step")
            def my_step_function(...)

            # the step entrypoint_name will be "my_step_function"
            @step
            def my_step_function(...)

        Returns:
            The step entrypoint_name.
        """
        return self.step_configuration.name

    @property
    def parameters(self) -> Dict[str, str]:
        """The parameters used to run this step.

        Returns:
            The parameters used to run this step.
        """
        return self.step_configuration.parameters

    @property
    def settings(self) -> Dict[str, "BaseSettings"]:
        """Returns the step settings.

        These are runtime settings passed down to stack components, which
        can be set at step level.

        Returns:
            The step settings.
        """
        return self.step_configuration.settings

    @property
    def extra(self) -> Dict[str, Any]:
        """Returns the extra dictionary.

        This dict is meant to be used to pass any configuration down to the
        step that the user has use of.

        Returns:
            The extra dictionary.
        """
        return self.step_configuration.extra

    @property
    def enable_cache(self) -> Optional[bool]:
        """Returns whether caching is enabled for this step.

        Returns:
            Whether caching is enabled for this step.
        """
        return self.step_configuration.enable_cache

    @property
    def enable_artifact_metadata(self) -> Optional[bool]:
        """Returns whether artifact metadata is enabled for this step.

        Returns:
            Whether artifact metadata is enabled for this step.
        """
        return self.step_configuration.enable_artifact_metadata

    @property
    def step_operator(self) -> Optional[str]:
        """Returns the name of the step operator of the step.

        Returns:
            The name of the step operator of the step.
        """
        return self.step_configuration.step_operator

    @property
    def experiment_tracker(self) -> Optional[str]:
        """Returns the name of the experiment tracker of the step.

        Returns:
            The name of the experiment tracker of the step.
        """
        return self.step_configuration.experiment_tracker

    @property
    def spec(self) -> "StepSpec":
        """Returns the step spec.

        The step spec defines the source path and upstream steps of a step and
        is used primarily to compare whether two steps are the same.

        Returns:
            The step spec.
        """
        return self.model.step.spec

    @property
    def status(self) -> ExecutionStatus:
        """Returns the current status of the step.

        Returns:
            The current status of the step.
        """
        # Query the step again since the status might have changed since this
        # object was created.
        return Client().zen_store.get_run_step(self.model.id).status

    @property
    def is_cached(self) -> bool:
        """Returns whether the step is cached or not.

        Returns:
            True if the step is cached, False otherwise.
        """
        return self.status == ExecutionStatus.CACHED

    @property
    def is_completed(self) -> bool:
        """Returns whether the step is cached or not.

        Returns:
            True if the step is completed, False otherwise.
        """
        return self.status == ExecutionStatus.COMPLETED

    @property
    def inputs(self) -> Dict[str, ArtifactView]:
        """Returns all input artifacts that were used to run this step.

        Returns:
            A dictionary of artifact names to artifact views.
        """
        self._ensure_inputs_fetched()
        return self._inputs

    @property
    def input(self) -> ArtifactView:
        """Returns the input artifact that was used to run this step.

        Returns:
            The input artifact.

        Raises:
            ValueError: If there were zero or multiple inputs to this step.
        """
        if len(self.inputs) != 1:
            raise ValueError(
                "Can't use the `StepView.input` property for steps with zero "
                "or multiple inputs, use `StepView.inputs` instead."
            )
        return next(iter(self.inputs.values()))

    @property
    def outputs(self) -> Dict[str, ArtifactView]:
        """Returns all output artifacts that were written by this step.

        Returns:
            A dictionary of artifact names to artifact views.
        """
        self._ensure_outputs_fetched()
        return self._outputs

    @property
    def output(self) -> ArtifactView:
        """Returns the output artifact that was written by this step.

        Returns:
            The output artifact.

        Raises:
            ValueError: If there were zero or multiple step outputs.
        """
        if len(self.outputs) != 1:
            raise ValueError(
                "Can't use the `StepView.output` property for steps with zero "
                "or multiple outputs, use `StepView.outputs` instead."
            )
        return next(iter(self.outputs.values()))

    def _ensure_inputs_fetched(self) -> None:
        """Fetches all step inputs from the ZenStore."""
        if self._inputs:
            # we already fetched inputs, no need to do anything
            return

        self._inputs = {
            name: ArtifactView(artifact_model)
            for name, artifact_model in self.model.input_artifacts.items()
        }

    def _ensure_outputs_fetched(self) -> None:
        """Fetches all step outputs from the ZenStore."""
        if self._outputs:
            # we already fetched outputs, no need to do anything
            return

        self._outputs = {
            name: ArtifactView(artifact_model)
            for name, artifact_model in self.model.output_artifacts.items()
        }
enable_artifact_metadata: Optional[bool] property readonly

Returns whether artifact metadata is enabled for this step.

Returns:

Type Description
Optional[bool]

Whether artifact metadata is enabled for this step.

enable_cache: Optional[bool] property readonly

Returns whether caching is enabled for this step.

Returns:

Type Description
Optional[bool]

Whether caching is enabled for this step.

entrypoint_name: str property readonly

Returns the step entrypoint_name.

This name is equal to the name argument passed to the @step decorator or the actual function name if no explicit name was given.

Examples:

the step entrypoint_name will be "my_step"

@step(step="my_step") def my_step_function(...)

the step entrypoint_name will be "my_step_function"

@step def my_step_function(...)

Returns:

Type Description
str

The step entrypoint_name.

experiment_tracker: Optional[str] property readonly

Returns the name of the experiment tracker of the step.

Returns:

Type Description
Optional[str]

The name of the experiment tracker of the step.

extra: Dict[str, Any] property readonly

Returns the extra dictionary.

This dict is meant to be used to pass any configuration down to the step that the user has use of.

Returns:

Type Description
Dict[str, Any]

The extra dictionary.

input: ArtifactView property readonly

Returns the input artifact that was used to run this step.

Returns:

Type Description
ArtifactView

The input artifact.

Exceptions:

Type Description
ValueError

If there were zero or multiple inputs to this step.

inputs: Dict[str, zenml.post_execution.artifact.ArtifactView] property readonly

Returns all input artifacts that were used to run this step.

Returns:

Type Description
Dict[str, zenml.post_execution.artifact.ArtifactView]

A dictionary of artifact names to artifact views.

is_cached: bool property readonly

Returns whether the step is cached or not.

Returns:

Type Description
bool

True if the step is cached, False otherwise.

is_completed: bool property readonly

Returns whether the step is cached or not.

Returns:

Type Description
bool

True if the step is completed, False otherwise.

model: StepRunResponseModel property readonly

Returns the underlying StepRunResponseModel.

Returns:

Type Description
StepRunResponseModel

The underlying StepRunResponseModel.

output: ArtifactView property readonly

Returns the output artifact that was written by this step.

Returns:

Type Description
ArtifactView

The output artifact.

Exceptions:

Type Description
ValueError

If there were zero or multiple step outputs.

outputs: Dict[str, zenml.post_execution.artifact.ArtifactView] property readonly

Returns all output artifacts that were written by this step.

Returns:

Type Description
Dict[str, zenml.post_execution.artifact.ArtifactView]

A dictionary of artifact names to artifact views.

parameters: Dict[str, str] property readonly

The parameters used to run this step.

Returns:

Type Description
Dict[str, str]

The parameters used to run this step.

settings: Dict[str, BaseSettings] property readonly

Returns the step settings.

These are runtime settings passed down to stack components, which can be set at step level.

Returns:

Type Description
Dict[str, BaseSettings]

The step settings.

spec: StepSpec property readonly

Returns the step spec.

The step spec defines the source path and upstream steps of a step and is used primarily to compare whether two steps are the same.

Returns:

Type Description
StepSpec

The step spec.

status: ExecutionStatus property readonly

Returns the current status of the step.

Returns:

Type Description
ExecutionStatus

The current status of the step.

step_configuration: StepConfiguration property readonly

Returns the step configuration.

Returns:

Type Description
StepConfiguration

The step configuration.

step_operator: Optional[str] property readonly

Returns the name of the step operator of the step.

Returns:

Type Description
Optional[str]

The name of the step operator of the step.

MODEL_CLASS (StepRunBaseModel, WorkspaceScopedResponseModel) pydantic-model

Response model for step runs.

Source code in zenml/post_execution/step.py
class StepRunResponseModel(StepRunBaseModel, WorkspaceScopedResponseModel):
    """Response model for step runs."""

    input_artifacts: Dict[str, "ArtifactResponseModel"] = {}
    output_artifacts: Dict[str, "ArtifactResponseModel"] = {}
    metadata: Dict[str, "RunMetadataResponseModel"] = Field(
        default={},
        title="Metadata associated with this step run.",
    )
__json_encoder__(obj) special staticmethod

partial(func, args, *keywords) - new function with partial application of the given arguments and keywords.

__init__(self, model) special

Initializes a post-execution step object.

In most cases StepView objects should not be created manually but retrieved from a PipelineRunView object instead.

Parameters:

Name Type Description Default
model StepRunResponseModel

The model to initialize this object from.

required
Source code in zenml/post_execution/step.py
def __init__(self, model: StepRunResponseModel):
    """Initializes a post-execution step object.

    In most cases `StepView` objects should not be created manually
    but retrieved from a `PipelineRunView` object instead.

    Args:
        model: The model to initialize this object from.
    """
    super().__init__(model)
    self._inputs: Dict[str, ArtifactView] = {}
    self._outputs: Dict[str, ArtifactView] = {}