Skip to content

Lineage Graph

zenml.lineage_graph special

Initialization of lineage generation module.

edge

Class for Edges in a lineage graph.

Edge (BaseModel) pydantic-model

A class that represents an edge in a lineage graph.

Source code in zenml/lineage_graph/edge.py
class Edge(BaseModel):
    """A class that represents an edge in a lineage graph."""

    id: str
    source: str
    target: str

lineage_graph

Class for lineage graph generation.

LineageGraph (BaseModel) pydantic-model

A lineage graph representation of a PipelineRunResponseModel.

Source code in zenml/lineage_graph/lineage_graph.py
class LineageGraph(BaseModel):
    """A lineage graph representation of a PipelineRunResponseModel."""

    nodes: List[Union[StepNode, ArtifactNode]] = []
    edges: List[Edge] = []
    root_step_id: Optional[str] = None
    run_metadata: List[Tuple[str, str, str]] = []

    def generate_step_nodes_and_edges(
        self, step: StepRunResponseModel
    ) -> None:
        """Generates the step nodes and the edges between them.

        Args:
            step: The step to generate the nodes and edges for.
        """
        step_id = STEP_PREFIX + str(step.id)
        if self.root_step_id is None:
            self.root_step_id = step_id
        step_config = step.config.dict()
        if step_config:
            step_config = {
                key: value
                for key, value in step_config.items()
                if key not in ["inputs", "outputs", "parameters"] and value
            }
        self.nodes.append(
            StepNode(
                id=step_id,
                data=StepNodeDetails(
                    execution_id=str(step.id),
                    name=step.name,  # redundant for consistency
                    status=step.status,
                    entrypoint_name=step.config.name,  # redundant for consistency
                    parameters=step.config.parameters,
                    configuration=step_config,
                    inputs={k: v.uri for k, v in step.inputs.items()},
                    outputs={k: v.uri for k, v in step.outputs.items()},
                    metadata=[
                        (m.key, str(m.value), str(m.type))
                        for m in step.metadata.values()
                    ],
                ),
            )
        )

        for artifact_name, artifact in step.outputs.items():
            artifact_id = ARTIFACT_PREFIX + str(artifact.id)
            self.nodes.append(
                ArtifactNode(
                    id=artifact_id,
                    data=ArtifactNodeDetails(
                        execution_id=str(artifact.id),
                        name=artifact_name,
                        status=step.status,
                        is_cached=step.status == ExecutionStatus.CACHED,
                        artifact_type=artifact.type,
                        artifact_data_type=artifact.data_type.import_path,
                        parent_step_id=str(step.id),
                        producer_step_id=str(artifact.producer_step_run_id),
                        uri=artifact.uri,
                        metadata=[
                            (m.key, str(m.value), str(m.type))
                            for m in artifact.metadata.values()
                        ],
                    ),
                )
            )
            self.edges.append(
                Edge(
                    id=step_id + "_" + artifact_id,
                    source=step_id,
                    target=artifact_id,
                )
            )

        for artifact_name, artifact in step.inputs.items():
            artifact_id = ARTIFACT_PREFIX + str(artifact.id)
            self.edges.append(
                Edge(
                    id=step_id + "_" + artifact_id,
                    source=artifact_id,
                    target=step_id,
                )
            )

    def generate_run_nodes_and_edges(
        self, run: PipelineRunResponseModel
    ) -> None:
        """Generates the run nodes and the edges between them.

        Args:
            run: The PipelineRunResponseModel to generate the lineage graph for.
        """
        self.run_metadata = [
            (m.key, str(m.value), str(m.type)) for m in run.metadata.values()
        ]
        for step in run.steps.values():
            self.generate_step_nodes_and_edges(step)
generate_run_nodes_and_edges(self, run)

Generates the run nodes and the edges between them.

Parameters:

Name Type Description Default
run PipelineRunResponseModel

The PipelineRunResponseModel to generate the lineage graph for.

required
Source code in zenml/lineage_graph/lineage_graph.py
def generate_run_nodes_and_edges(
    self, run: PipelineRunResponseModel
) -> None:
    """Generates the run nodes and the edges between them.

    Args:
        run: The PipelineRunResponseModel to generate the lineage graph for.
    """
    self.run_metadata = [
        (m.key, str(m.value), str(m.type)) for m in run.metadata.values()
    ]
    for step in run.steps.values():
        self.generate_step_nodes_and_edges(step)
generate_step_nodes_and_edges(self, step)

Generates the step nodes and the edges between them.

Parameters:

Name Type Description Default
step StepRunResponseModel

The step to generate the nodes and edges for.

required
Source code in zenml/lineage_graph/lineage_graph.py
def generate_step_nodes_and_edges(
    self, step: StepRunResponseModel
) -> None:
    """Generates the step nodes and the edges between them.

    Args:
        step: The step to generate the nodes and edges for.
    """
    step_id = STEP_PREFIX + str(step.id)
    if self.root_step_id is None:
        self.root_step_id = step_id
    step_config = step.config.dict()
    if step_config:
        step_config = {
            key: value
            for key, value in step_config.items()
            if key not in ["inputs", "outputs", "parameters"] and value
        }
    self.nodes.append(
        StepNode(
            id=step_id,
            data=StepNodeDetails(
                execution_id=str(step.id),
                name=step.name,  # redundant for consistency
                status=step.status,
                entrypoint_name=step.config.name,  # redundant for consistency
                parameters=step.config.parameters,
                configuration=step_config,
                inputs={k: v.uri for k, v in step.inputs.items()},
                outputs={k: v.uri for k, v in step.outputs.items()},
                metadata=[
                    (m.key, str(m.value), str(m.type))
                    for m in step.metadata.values()
                ],
            ),
        )
    )

    for artifact_name, artifact in step.outputs.items():
        artifact_id = ARTIFACT_PREFIX + str(artifact.id)
        self.nodes.append(
            ArtifactNode(
                id=artifact_id,
                data=ArtifactNodeDetails(
                    execution_id=str(artifact.id),
                    name=artifact_name,
                    status=step.status,
                    is_cached=step.status == ExecutionStatus.CACHED,
                    artifact_type=artifact.type,
                    artifact_data_type=artifact.data_type.import_path,
                    parent_step_id=str(step.id),
                    producer_step_id=str(artifact.producer_step_run_id),
                    uri=artifact.uri,
                    metadata=[
                        (m.key, str(m.value), str(m.type))
                        for m in artifact.metadata.values()
                    ],
                ),
            )
        )
        self.edges.append(
            Edge(
                id=step_id + "_" + artifact_id,
                source=step_id,
                target=artifact_id,
            )
        )

    for artifact_name, artifact in step.inputs.items():
        artifact_id = ARTIFACT_PREFIX + str(artifact.id)
        self.edges.append(
            Edge(
                id=step_id + "_" + artifact_id,
                source=artifact_id,
                target=step_id,
            )
        )

node special

Initialization of lineage nodes.

artifact_node

Class for all lineage artifact nodes.

ArtifactNode (BaseNode) pydantic-model

A class that represents an artifact node in a lineage graph.

Source code in zenml/lineage_graph/node/artifact_node.py
class ArtifactNode(BaseNode):
    """A class that represents an artifact node in a lineage graph."""

    type: str = "artifact"
    data: ArtifactNodeDetails
ArtifactNodeDetails (BaseNodeDetails) pydantic-model

Captures all artifact details for the node.

Source code in zenml/lineage_graph/node/artifact_node.py
class ArtifactNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    is_cached: bool
    artifact_type: str
    artifact_data_type: str
    parent_step_id: str
    producer_step_id: Optional[str]
    uri: str
    metadata: List[Tuple[str, str, str]]  # (key, value, type)

base_node

Base class for all lineage nodes.

BaseNode (BaseModel) pydantic-model

A class that represents a node in a lineage graph.

Source code in zenml/lineage_graph/node/base_node.py
class BaseNode(BaseModel):
    """A class that represents a node in a lineage graph."""

    id: str
    type: str
    data: BaseNodeDetails
BaseNodeDetails (BaseModel) pydantic-model

Captures all details for the node.

Source code in zenml/lineage_graph/node/base_node.py
class BaseNodeDetails(BaseModel):
    """Captures all details for the node."""

    execution_id: str
    name: str
    status: ExecutionStatus

step_node

Class for all lineage step nodes.

StepNode (BaseNode) pydantic-model

A class that represents a step node in a lineage graph.

Source code in zenml/lineage_graph/node/step_node.py
class StepNode(BaseNode):
    """A class that represents a step node in a lineage graph."""

    type: str = "step"
    data: StepNodeDetails
StepNodeDetails (BaseNodeDetails) pydantic-model

Captures all artifact details for the node.

Source code in zenml/lineage_graph/node/step_node.py
class StepNodeDetails(BaseNodeDetails):
    """Captures all artifact details for the node."""

    entrypoint_name: str
    parameters: Dict[str, Any]
    configuration: Dict[str, Any]
    inputs: Dict[str, Any]
    outputs: Dict[str, Any]
    metadata: List[Tuple[str, str, str]]  # (key, value, type)