Post Execution
zenml.post_execution
special
After executing a pipeline, the user needs to be able to fetch it from history and perform certain tasks. The post_execution submodule provides a set of interfaces with which the user can interact with artifacts, the pipeline, steps, and the post-run pipeline object.
artifact
ArtifactView
Post-execution artifact class which can be used to read artifact data that was created during a pipeline execution.
Source code in zenml/post_execution/artifact.py
class ArtifactView:
"""Post-execution artifact class which can be used to read
artifact data that was created during a pipeline execution.
"""
def __init__(
self,
id_: int,
type_: str,
uri: str,
materializer: str,
data_type: str,
metadata_store: "BaseMetadataStore",
parent_step_id: int,
):
"""Initializes a post-execution artifact object.
In most cases `ArtifactView` objects should not be created manually but
retrieved from a `StepView` via the `inputs` or `outputs` properties.
Args:
id_: The artifact id.
type_: The type of this artifact.
uri: Specifies where the artifact data is stored.
materializer: Information needed to restore the materializer
that was used to write this artifact.
data_type: The type of data that was passed to the materializer
when writing that artifact. Will be used as a default type
to read the artifact.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline.
parent_step_id: The ID of the parent step.
"""
self._id = id_
self._type = type_
self._uri = uri
self._materializer = materializer
self._data_type = data_type
self._metadata_store = metadata_store
self._parent_step_id = parent_step_id
@property
def id(self) -> int:
"""Returns the artifact id."""
return self._id
@property
def type(self) -> str:
"""Returns the artifact type."""
return self._type
@property
def data_type(self) -> str:
"""Returns the data type of the artifact."""
return self._data_type
@property
def uri(self) -> str:
"""Returns the URI where the artifact data is stored."""
return self._uri
@property
def parent_step_id(self) -> int:
"""Returns the ID of the parent step. This need not be equivalent to
the ID of the producer step."""
return self._parent_step_id
@property
def producer_step(self) -> "StepView":
"""Returns the original StepView that produced the artifact."""
# TODO [ENG-174]: Replace with artifact.id instead of passing self if
# required.
return self._metadata_store.get_producer_step_from_artifact(self)
@property
def is_cached(self) -> bool:
"""Returns True if artifact was cached in a previous run, else False."""
# self._metadata_store.
return self.producer_step.id != self.parent_step_id
def read(
self,
output_data_type: Optional[Type[Any]] = None,
materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
"""Materializes the data stored in this artifact.
Args:
output_data_type: The datatype to which the materializer should
read, will be passed to the materializers `handle_input` method.
materializer_class: The class of the materializer that should be
used to read the artifact data. If no materializer class is
given, we use the materializer that was used to write the
artifact during execution of the pipeline.
Returns:
The materialized data.
"""
if not materializer_class:
try:
materializer_class = source_utils.load_source_path_class(
self._materializer
)
except (ModuleNotFoundError, AttributeError) as e:
logger.error(
f"ZenML can not locate and import the materializer module "
f"{self._materializer} which was used to write this "
f"artifact. If you want to read from it, please provide "
f"a 'materializer_class'."
)
raise ModuleNotFoundError(e) from e
if not output_data_type:
try:
output_data_type = source_utils.load_source_path_class(
self._data_type
)
except (ModuleNotFoundError, AttributeError) as e:
logger.error(
f"ZenML can not locate and import the data type of this "
f"artifact {self._data_type}. If you want to read "
f"from it, please provide a 'output_data_type'."
)
raise ModuleNotFoundError(e) from e
logger.debug(
"Using '%s' to read '%s' (uri: %s).",
materializer_class.__qualname__,
self._type,
self._uri,
)
# TODO [ENG-162]: passing in `self` to initialize the materializer only
# works because materializers only require a `.uri` property at the
# moment.
materializer = materializer_class(self) # type: ignore[arg-type]
return materializer.handle_input(output_data_type)
def __repr__(self) -> str:
"""Returns a string representation of this artifact."""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"type='{self._type}', uri='{self._uri}', "
f"materializer='{self._materializer}')"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the
same artifact."""
if isinstance(other, ArtifactView):
return self._id == other._id and self._uri == other._uri
return NotImplemented
data_type: str
property
readonly
Returns the data type of the artifact.
id: int
property
readonly
Returns the artifact id.
is_cached: bool
property
readonly
Returns True if artifact was cached in a previous run, else False.
parent_step_id: int
property
readonly
Returns the ID of the parent step. This need not be equivalent to the ID of the producer step.
producer_step: StepView
property
readonly
Returns the original StepView that produced the artifact.
type: str
property
readonly
Returns the artifact type.
uri: str
property
readonly
Returns the URI where the artifact data is stored.
__eq__(self, other)
special
Returns whether the other object is referring to the same artifact.
Source code in zenml/post_execution/artifact.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the
same artifact."""
if isinstance(other, ArtifactView):
return self._id == other._id and self._uri == other._uri
return NotImplemented
__init__(self, id_, type_, uri, materializer, data_type, metadata_store, parent_step_id)
special
Initializes a post-execution artifact object.
In most cases ArtifactView
objects should not be created manually but
retrieved from a StepView
via the inputs
or outputs
properties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
int |
The artifact id. |
required |
type_ |
str |
The type of this artifact. |
required |
uri |
str |
Specifies where the artifact data is stored. |
required |
materializer |
str |
Information needed to restore the materializer that was used to write this artifact. |
required |
data_type |
str |
The type of data that was passed to the materializer when writing that artifact. Will be used as a default type to read the artifact. |
required |
metadata_store |
BaseMetadataStore |
The metadata store which should be used to fetch additional information related to this pipeline. |
required |
parent_step_id |
int |
The ID of the parent step. |
required |
Source code in zenml/post_execution/artifact.py
def __init__(
self,
id_: int,
type_: str,
uri: str,
materializer: str,
data_type: str,
metadata_store: "BaseMetadataStore",
parent_step_id: int,
):
"""Initializes a post-execution artifact object.
In most cases `ArtifactView` objects should not be created manually but
retrieved from a `StepView` via the `inputs` or `outputs` properties.
Args:
id_: The artifact id.
type_: The type of this artifact.
uri: Specifies where the artifact data is stored.
materializer: Information needed to restore the materializer
that was used to write this artifact.
data_type: The type of data that was passed to the materializer
when writing that artifact. Will be used as a default type
to read the artifact.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline.
parent_step_id: The ID of the parent step.
"""
self._id = id_
self._type = type_
self._uri = uri
self._materializer = materializer
self._data_type = data_type
self._metadata_store = metadata_store
self._parent_step_id = parent_step_id
__repr__(self)
special
Returns a string representation of this artifact.
Source code in zenml/post_execution/artifact.py
def __repr__(self) -> str:
"""Returns a string representation of this artifact."""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"type='{self._type}', uri='{self._uri}', "
f"materializer='{self._materializer}')"
)
read(self, output_data_type=None, materializer_class=None)
Materializes the data stored in this artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
output_data_type |
Optional[Type[Any]] |
The datatype to which the materializer should
read, will be passed to the materializers |
None |
materializer_class |
Optional[Type[BaseMaterializer]] |
The class of the materializer that should be used to read the artifact data. If no materializer class is given, we use the materializer that was used to write the artifact during execution of the pipeline. |
None |
Returns:
Type | Description |
---|---|
Any |
The materialized data. |
Source code in zenml/post_execution/artifact.py
def read(
self,
output_data_type: Optional[Type[Any]] = None,
materializer_class: Optional[Type["BaseMaterializer"]] = None,
) -> Any:
"""Materializes the data stored in this artifact.
Args:
output_data_type: The datatype to which the materializer should
read, will be passed to the materializers `handle_input` method.
materializer_class: The class of the materializer that should be
used to read the artifact data. If no materializer class is
given, we use the materializer that was used to write the
artifact during execution of the pipeline.
Returns:
The materialized data.
"""
if not materializer_class:
try:
materializer_class = source_utils.load_source_path_class(
self._materializer
)
except (ModuleNotFoundError, AttributeError) as e:
logger.error(
f"ZenML can not locate and import the materializer module "
f"{self._materializer} which was used to write this "
f"artifact. If you want to read from it, please provide "
f"a 'materializer_class'."
)
raise ModuleNotFoundError(e) from e
if not output_data_type:
try:
output_data_type = source_utils.load_source_path_class(
self._data_type
)
except (ModuleNotFoundError, AttributeError) as e:
logger.error(
f"ZenML can not locate and import the data type of this "
f"artifact {self._data_type}. If you want to read "
f"from it, please provide a 'output_data_type'."
)
raise ModuleNotFoundError(e) from e
logger.debug(
"Using '%s' to read '%s' (uri: %s).",
materializer_class.__qualname__,
self._type,
self._uri,
)
# TODO [ENG-162]: passing in `self` to initialize the materializer only
# works because materializers only require a `.uri` property at the
# moment.
materializer = materializer_class(self) # type: ignore[arg-type]
return materializer.handle_input(output_data_type)
pipeline
PipelineView
Post-execution pipeline class which can be used to query pipeline-related information from the metadata store.
Source code in zenml/post_execution/pipeline.py
class PipelineView:
"""Post-execution pipeline class which can be used to query
pipeline-related information from the metadata store.
"""
def __init__(
self, id_: int, name: str, metadata_store: "BaseMetadataStore"
):
"""Initializes a post-execution pipeline object.
In most cases `PipelineView` objects should not be created manually
but retrieved using the `get_pipelines()` method of a
`zenml.core.repo.Repository` instead.
Args:
id_: The context id of this pipeline.
name: The name of this pipeline.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline.
"""
self._id = id_
self._name = name
self._metadata_store = metadata_store
@property
def name(self) -> str:
"""Returns the name of the pipeline."""
return self._name
@property
def runs(self) -> List["PipelineRunView"]:
"""Returns all stored runs of this pipeline.
The runs are returned in chronological order, so the latest
run will be the last element in this list.
"""
# Do not cache runs as new runs might appear during this objects
# lifecycle
runs = self._metadata_store.get_pipeline_runs(self)
return list(runs.values())
def get_run_names(self) -> List[str]:
"""Returns a list of all run names."""
# Do not cache runs as new runs might appear during this objects
# lifecycle
runs = self._metadata_store.get_pipeline_runs(self)
return list(runs.keys())
def get_run(self, name: str) -> "PipelineRunView":
"""Returns a run for the given name.
Args:
name: The name of the run to return.
Raises:
KeyError: If there is no run with the given name.
"""
run = self._metadata_store.get_pipeline_run(self, name)
if not run:
raise KeyError(
f"No run found for name `{name}`. This pipeline "
f"only has runs with the following "
f"names: `{self.get_run_names()}`"
)
return run
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
"""This method helps you find out which pipeline run produced
the cached artifact of a given step.
Args:
step_name: Name of step at hand
Return:
None if no run is found that completed the given step,
else the original pipeline_run
"""
orig_pipeline_run = None
for run in reversed(self.runs):
try:
step = run.get_step(step_name)
if step.is_completed:
orig_pipeline_run = run
break
except KeyError:
pass
if not orig_pipeline_run:
raise LookupError(
"No Pipeline Run could be found, that has"
f" completed the provided step: [{step_name}]"
)
return orig_pipeline_run
def __repr__(self) -> str:
"""Returns a string representation of this pipeline."""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self._name}')"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the
same pipeline."""
if isinstance(other, PipelineView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
name: str
property
readonly
Returns the name of the pipeline.
runs: List[PipelineRunView]
property
readonly
Returns all stored runs of this pipeline.
The runs are returned in chronological order, so the latest run will be the last element in this list.
__eq__(self, other)
special
Returns whether the other object is referring to the same pipeline.
Source code in zenml/post_execution/pipeline.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the
same pipeline."""
if isinstance(other, PipelineView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
__init__(self, id_, name, metadata_store)
special
Initializes a post-execution pipeline object.
In most cases PipelineView
objects should not be created manually
but retrieved using the get_pipelines()
method of a
zenml.core.repo.Repository
instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
int |
The context id of this pipeline. |
required |
name |
str |
The name of this pipeline. |
required |
metadata_store |
BaseMetadataStore |
The metadata store which should be used to fetch additional information related to this pipeline. |
required |
Source code in zenml/post_execution/pipeline.py
def __init__(
self, id_: int, name: str, metadata_store: "BaseMetadataStore"
):
"""Initializes a post-execution pipeline object.
In most cases `PipelineView` objects should not be created manually
but retrieved using the `get_pipelines()` method of a
`zenml.core.repo.Repository` instead.
Args:
id_: The context id of this pipeline.
name: The name of this pipeline.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline.
"""
self._id = id_
self._name = name
self._metadata_store = metadata_store
__repr__(self)
special
Returns a string representation of this pipeline.
Source code in zenml/post_execution/pipeline.py
def __repr__(self) -> str:
"""Returns a string representation of this pipeline."""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self._name}')"
)
get_run(self, name)
Returns a run for the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the run to return. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If there is no run with the given name. |
Source code in zenml/post_execution/pipeline.py
def get_run(self, name: str) -> "PipelineRunView":
"""Returns a run for the given name.
Args:
name: The name of the run to return.
Raises:
KeyError: If there is no run with the given name.
"""
run = self._metadata_store.get_pipeline_run(self, name)
if not run:
raise KeyError(
f"No run found for name `{name}`. This pipeline "
f"only has runs with the following "
f"names: `{self.get_run_names()}`"
)
return run
get_run_for_completed_step(self, step_name)
This method helps you find out which pipeline run produced the cached artifact of a given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_name |
str |
Name of step at hand |
required |
Returns:
Type | Description |
---|---|
PipelineRunView |
None if no run is found that completed the given step, else the original pipeline_run |
Source code in zenml/post_execution/pipeline.py
def get_run_for_completed_step(self, step_name: str) -> "PipelineRunView":
"""This method helps you find out which pipeline run produced
the cached artifact of a given step.
Args:
step_name: Name of step at hand
Return:
None if no run is found that completed the given step,
else the original pipeline_run
"""
orig_pipeline_run = None
for run in reversed(self.runs):
try:
step = run.get_step(step_name)
if step.is_completed:
orig_pipeline_run = run
break
except KeyError:
pass
if not orig_pipeline_run:
raise LookupError(
"No Pipeline Run could be found, that has"
f" completed the provided step: [{step_name}]"
)
return orig_pipeline_run
get_run_names(self)
Returns a list of all run names.
Source code in zenml/post_execution/pipeline.py
def get_run_names(self) -> List[str]:
"""Returns a list of all run names."""
# Do not cache runs as new runs might appear during this objects
# lifecycle
runs = self._metadata_store.get_pipeline_runs(self)
return list(runs.keys())
pipeline_run
PipelineRunView
Post-execution pipeline run class which can be used to query steps and artifact information associated with a pipeline execution.
Source code in zenml/post_execution/pipeline_run.py
class PipelineRunView:
"""Post-execution pipeline run class which can be used to query
steps and artifact information associated with a pipeline execution.
"""
def __init__(
self,
id_: int,
name: str,
executions: List[proto.Execution],
metadata_store: "BaseMetadataStore",
):
"""Initializes a post-execution pipeline run object.
In most cases `PipelineRunView` objects should not be created manually
but retrieved from a `PipelineView` object instead.
Args:
id_: The context id of this pipeline run.
name: The name of this pipeline run.
executions: All executions associated with this pipeline run.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline run.
"""
self._id = id_
self._name = name
self._metadata_store = metadata_store
self._executions = executions
self._steps: Dict[str, StepView] = OrderedDict()
@property
def name(self) -> str:
"""Returns the name of the pipeline run."""
return self._name
@property
def status(self) -> ExecutionStatus:
"""Returns the current status of the pipeline run."""
step_statuses = (step.status for step in self.steps)
if any(status == ExecutionStatus.FAILED for status in step_statuses):
return ExecutionStatus.FAILED
elif all(
status == ExecutionStatus.COMPLETED
or status == ExecutionStatus.CACHED
for status in step_statuses
):
return ExecutionStatus.COMPLETED
else:
return ExecutionStatus.RUNNING
@property
def steps(self) -> List[StepView]:
"""Returns all steps that were executed as part of this pipeline run."""
self._ensure_steps_fetched()
return list(self._steps.values())
def get_step_names(self) -> List[str]:
"""Returns a list of all step names."""
self._ensure_steps_fetched()
return list(self._steps.keys())
def get_step(self, name: str) -> StepView:
"""Returns a step for the given name.
Args:
name: The name of the step to return.
Raises:
KeyError: If there is no step with the given name.
"""
self._ensure_steps_fetched()
try:
return self._steps[name]
except KeyError:
raise KeyError(
f"No step found for name `{name}`. This pipeline "
f"run only has steps with the following "
f"names: `{self.get_step_names()}`"
)
def _ensure_steps_fetched(self) -> None:
"""Fetches all steps for this pipeline run from the metadata store."""
if self._steps:
# we already fetched the steps, no need to do anything
return
self._steps = self._metadata_store.get_pipeline_run_steps(self)
def __repr__(self) -> str:
"""Returns a string representation of this pipeline run."""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self._name}')"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same
pipeline run."""
if isinstance(other, PipelineRunView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
name: str
property
readonly
Returns the name of the pipeline run.
status: ExecutionStatus
property
readonly
Returns the current status of the pipeline run.
steps: List[zenml.post_execution.step.StepView]
property
readonly
Returns all steps that were executed as part of this pipeline run.
__eq__(self, other)
special
Returns whether the other object is referring to the same pipeline run.
Source code in zenml/post_execution/pipeline_run.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same
pipeline run."""
if isinstance(other, PipelineRunView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
__init__(self, id_, name, executions, metadata_store)
special
Initializes a post-execution pipeline run object.
In most cases PipelineRunView
objects should not be created manually
but retrieved from a PipelineView
object instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
int |
The context id of this pipeline run. |
required |
name |
str |
The name of this pipeline run. |
required |
executions |
List[ml_metadata.proto.metadata_store_pb2.Execution] |
All executions associated with this pipeline run. |
required |
metadata_store |
BaseMetadataStore |
The metadata store which should be used to fetch additional information related to this pipeline run. |
required |
Source code in zenml/post_execution/pipeline_run.py
def __init__(
self,
id_: int,
name: str,
executions: List[proto.Execution],
metadata_store: "BaseMetadataStore",
):
"""Initializes a post-execution pipeline run object.
In most cases `PipelineRunView` objects should not be created manually
but retrieved from a `PipelineView` object instead.
Args:
id_: The context id of this pipeline run.
name: The name of this pipeline run.
executions: All executions associated with this pipeline run.
metadata_store: The metadata store which should be used to fetch
additional information related to this pipeline run.
"""
self._id = id_
self._name = name
self._metadata_store = metadata_store
self._executions = executions
self._steps: Dict[str, StepView] = OrderedDict()
__repr__(self)
special
Returns a string representation of this pipeline run.
Source code in zenml/post_execution/pipeline_run.py
def __repr__(self) -> str:
"""Returns a string representation of this pipeline run."""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self._name}')"
)
get_step(self, name)
Returns a step for the given name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
The name of the step to return. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If there is no step with the given name. |
Source code in zenml/post_execution/pipeline_run.py
def get_step(self, name: str) -> StepView:
"""Returns a step for the given name.
Args:
name: The name of the step to return.
Raises:
KeyError: If there is no step with the given name.
"""
self._ensure_steps_fetched()
try:
return self._steps[name]
except KeyError:
raise KeyError(
f"No step found for name `{name}`. This pipeline "
f"run only has steps with the following "
f"names: `{self.get_step_names()}`"
)
get_step_names(self)
Returns a list of all step names.
Source code in zenml/post_execution/pipeline_run.py
def get_step_names(self) -> List[str]:
"""Returns a list of all step names."""
self._ensure_steps_fetched()
return list(self._steps.keys())
step
StepView
Post-execution step class which can be used to query artifact information associated with a pipeline step.
Source code in zenml/post_execution/step.py
class StepView:
"""Post-execution step class which can be used to query
artifact information associated with a pipeline step.
"""
def __init__(
self,
id_: int,
parents_step_ids: List[int],
entrypoint_name: str,
name: str,
parameters: Dict[str, Any],
metadata_store: "BaseMetadataStore",
):
"""Initializes a post-execution step object.
In most cases `StepView` objects should not be created manually
but retrieved from a `PipelineRunView` object instead.
Args:
id_: The execution id of this step.
parents_step_ids: The execution ids of the parents of this step.
entrypoint_name: The name of this step.
name: The name of this step within the pipeline
parameters: Parameters that were used to run this step.
metadata_store: The metadata store which should be used to fetch
additional information related to this step.
"""
self._id = id_
self._parents_step_ids = parents_step_ids
self._entrypoint_name = entrypoint_name
self._name = name
self._parameters = parameters
self._metadata_store = metadata_store
self._inputs: Dict[str, ArtifactView] = {}
self._outputs: Dict[str, ArtifactView] = {}
@property
def id(self) -> int:
"""Returns the step id."""
return self._id
@property
def parents_step_ids(self) -> List[int]:
"""Returns a list of ID's of all parents of this step."""
return self._parents_step_ids
@property
def parent_steps(self) -> List["StepView"]:
"""Returns a list of all parent steps of this step."""
steps = [
self._metadata_store.get_step_by_id(s)
for s in self.parents_step_ids
]
return steps
@property
def entrypoint_name(self) -> str:
"""Returns the step entrypoint_name.
This name is equal to the name argument passed to the @step decorator
or the actual function name if no explicit name was given.
Examples:
# the step entrypoint_name will be "my_step"
@step(name="my_step")
def my_step_function(...)
# the step entrypoint_name will be "my_step_function"
@step
def my_step_function(...)
"""
return self._entrypoint_name
@property
def name(self) -> str:
"""Returns the name as it is defined in the pipeline.
This name is equal to the name given to the step within the pipeline
context
Examples:
@step()
def my_step_function(...)
@pipeline
def my_pipeline_function(step_a)
p = my_pipeline_function(
step_a = my_step_function()
)
The name will be `step_a`
"""
return self._name
@property
def parameters(self) -> Dict[str, Any]:
"""The parameters used to run this step."""
return self._parameters
@property
def status(self) -> ExecutionStatus:
"""Returns the current status of the step."""
return self._metadata_store.get_step_status(self)
@property
def is_cached(self) -> bool:
"""Returns whether the step is cached or not."""
return self.status == ExecutionStatus.CACHED
@property
def is_completed(self) -> bool:
"""Returns whether the step is cached or not."""
return self.status == ExecutionStatus.COMPLETED
@property
def inputs(self) -> Dict[str, ArtifactView]:
"""Returns all input artifacts that were used to run this step."""
self._ensure_inputs_outputs_fetched()
return self._inputs
@property
def input(self) -> ArtifactView:
"""Returns the input artifact that was used to run this step.
Raises:
ValueError: If there were zero or multiple inputs to this step.
"""
if len(self.inputs) != 1:
raise ValueError(
"Can't use the `StepView.input` property for steps with zero "
"or multiple inputs, use `StepView.inputs` instead."
)
return next(iter(self.inputs.values()))
@property
def outputs(self) -> Dict[str, ArtifactView]:
"""Returns all output artifacts that were written by this step."""
self._ensure_inputs_outputs_fetched()
return self._outputs
@property
def output(self) -> ArtifactView:
"""Returns the output artifact that was written by this step.
Raises:
ValueError: If there were zero or multiple step outputs.
"""
if len(self.outputs) != 1:
raise ValueError(
"Can't use the `StepView.output` property for steps with zero "
"or multiple outputs, use `StepView.outputs` instead."
)
return next(iter(self.outputs.values()))
def _ensure_inputs_outputs_fetched(self) -> None:
"""Fetches all step inputs and outputs from the metadata store."""
if self._inputs or self._outputs:
# we already fetched inputs/outputs, no need to do anything
return
self._inputs, self._outputs = self._metadata_store.get_step_artifacts(
self
)
def __repr__(self) -> str:
"""Returns a string representation of this step."""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
f"parameters={self._parameters})"
)
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same step."""
if isinstance(other, StepView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
entrypoint_name: str
property
readonly
Returns the step entrypoint_name.
This name is equal to the name argument passed to the @step decorator or the actual function name if no explicit name was given.
Examples:
the step entrypoint_name will be "my_step"
@step(name="my_step") def my_step_function(...)
the step entrypoint_name will be "my_step_function"
@step def my_step_function(...)
id: int
property
readonly
Returns the step id.
input: ArtifactView
property
readonly
Returns the input artifact that was used to run this step.
Exceptions:
Type | Description |
---|---|
ValueError |
If there were zero or multiple inputs to this step. |
inputs: Dict[str, zenml.post_execution.artifact.ArtifactView]
property
readonly
Returns all input artifacts that were used to run this step.
is_cached: bool
property
readonly
Returns whether the step is cached or not.
is_completed: bool
property
readonly
Returns whether the step is cached or not.
name: str
property
readonly
Returns the name as it is defined in the pipeline.
This name is equal to the name given to the step within the pipeline context
Examples:
@step() def my_step_function(...)
@pipeline def my_pipeline_function(step_a)
p = my_pipeline_function( step_a = my_step_function() )
The name will be step_a
output: ArtifactView
property
readonly
Returns the output artifact that was written by this step.
Exceptions:
Type | Description |
---|---|
ValueError |
If there were zero or multiple step outputs. |
outputs: Dict[str, zenml.post_execution.artifact.ArtifactView]
property
readonly
Returns all output artifacts that were written by this step.
parameters: Dict[str, Any]
property
readonly
The parameters used to run this step.
parent_steps: List[StepView]
property
readonly
Returns a list of all parent steps of this step.
parents_step_ids: List[int]
property
readonly
Returns a list of ID's of all parents of this step.
status: ExecutionStatus
property
readonly
Returns the current status of the step.
__eq__(self, other)
special
Returns whether the other object is referring to the same step.
Source code in zenml/post_execution/step.py
def __eq__(self, other: Any) -> bool:
"""Returns whether the other object is referring to the same step."""
if isinstance(other, StepView):
return (
self._id == other._id
and self._metadata_store.uuid == other._metadata_store.uuid
)
return NotImplemented
__init__(self, id_, parents_step_ids, entrypoint_name, name, parameters, metadata_store)
special
Initializes a post-execution step object.
In most cases StepView
objects should not be created manually
but retrieved from a PipelineRunView
object instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
int |
The execution id of this step. |
required |
parents_step_ids |
List[int] |
The execution ids of the parents of this step. |
required |
entrypoint_name |
str |
The name of this step. |
required |
name |
str |
The name of this step within the pipeline |
required |
parameters |
Dict[str, Any] |
Parameters that were used to run this step. |
required |
metadata_store |
BaseMetadataStore |
The metadata store which should be used to fetch additional information related to this step. |
required |
Source code in zenml/post_execution/step.py
def __init__(
self,
id_: int,
parents_step_ids: List[int],
entrypoint_name: str,
name: str,
parameters: Dict[str, Any],
metadata_store: "BaseMetadataStore",
):
"""Initializes a post-execution step object.
In most cases `StepView` objects should not be created manually
but retrieved from a `PipelineRunView` object instead.
Args:
id_: The execution id of this step.
parents_step_ids: The execution ids of the parents of this step.
entrypoint_name: The name of this step.
name: The name of this step within the pipeline
parameters: Parameters that were used to run this step.
metadata_store: The metadata store which should be used to fetch
additional information related to this step.
"""
self._id = id_
self._parents_step_ids = parents_step_ids
self._entrypoint_name = entrypoint_name
self._name = name
self._parameters = parameters
self._metadata_store = metadata_store
self._inputs: Dict[str, ArtifactView] = {}
self._outputs: Dict[str, ArtifactView] = {}
__repr__(self)
special
Returns a string representation of this step.
Source code in zenml/post_execution/step.py
def __repr__(self) -> str:
"""Returns a string representation of this step."""
return (
f"{self.__class__.__qualname__}(id={self._id}, "
f"name='{self.name}', entrypoint_name='{self.entrypoint_name}'"
f"parameters={self._parameters})"
)