Skip to content

Stack

zenml.stack special

stack

Stack

ZenML stack class.

A ZenML stack is a collection of multiple stack components that are required to run ZenML pipelines. Some of these components (orchestrator, metadata store and artifact store) are required to run any kind of pipeline, other components like the container registry are only required if other stack components depend on them.

Source code in zenml/stack/stack.py
class Stack:
    """ZenML stack class.

    A ZenML stack is a collection of multiple stack components that are
    required to run ZenML pipelines. Some of these components (orchestrator,
    metadata store and artifact store) are required to run any kind of
    pipeline, other components like the container registry are only required
    if other stack components depend on them.
    """

    def __init__(
        self,
        name: str,
        *,
        orchestrator: "BaseOrchestrator",
        metadata_store: "BaseMetadataStore",
        artifact_store: "BaseArtifactStore",
        container_registry: Optional["BaseContainerRegistry"] = None,
    ):
        """Initializes and validates a stack instance.

        Raises:
             StackValidationError: If the stack configuration is not valid.
        """
        self._name = name
        self._orchestrator = orchestrator
        self._metadata_store = metadata_store
        self._artifact_store = artifact_store
        self._container_registry = container_registry

        self.validate()

    @classmethod
    def from_components(
        cls, name: str, components: Dict[StackComponentType, "StackComponent"]
    ) -> "Stack":
        """Creates a stack instance from a dict of stack components.

        Args:
            name: The name of the stack.
            components: The components of the stack.

        Returns:
            A stack instance consisting of the given components.

        Raises:
            TypeError: If a required component is missing or a component
                doesn't inherit from the expected base class.
        """
        from zenml.artifact_stores import BaseArtifactStore
        from zenml.container_registries import BaseContainerRegistry
        from zenml.metadata_stores import BaseMetadataStore
        from zenml.orchestrators import BaseOrchestrator

        def _raise_type_error(
            component: Optional["StackComponent"], expected_class: Type[Any]
        ) -> NoReturn:
            """Raises a TypeError that the component has an unexpected type."""
            raise TypeError(
                f"Unable to create stack: Wrong stack component type "
                f"`{component.__class__.__name__}` (expected: subclass "
                f"of `{expected_class.__name__}`)"
            )

        orchestrator = components.get(StackComponentType.ORCHESTRATOR)
        if not isinstance(orchestrator, BaseOrchestrator):
            _raise_type_error(orchestrator, BaseOrchestrator)

        metadata_store = components.get(StackComponentType.METADATA_STORE)
        if not isinstance(metadata_store, BaseMetadataStore):
            _raise_type_error(metadata_store, BaseMetadataStore)

        artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
        if not isinstance(artifact_store, BaseArtifactStore):
            _raise_type_error(artifact_store, BaseArtifactStore)

        container_registry = components.get(
            StackComponentType.CONTAINER_REGISTRY
        )
        if container_registry is not None and not isinstance(
            container_registry, BaseContainerRegistry
        ):
            _raise_type_error(container_registry, BaseContainerRegistry)

        return Stack(
            name=name,
            orchestrator=orchestrator,
            metadata_store=metadata_store,
            artifact_store=artifact_store,
            container_registry=container_registry,
        )

    @classmethod
    def default_local_stack(cls) -> "Stack":
        """Creates a stack instance which is configured to run locally."""
        from zenml.artifact_stores import LocalArtifactStore
        from zenml.metadata_stores import SQLiteMetadataStore
        from zenml.orchestrators import LocalOrchestrator

        orchestrator = LocalOrchestrator(name="local_orchestrator")

        artifact_store_uuid = uuid.uuid4()
        artifact_store_path = os.path.join(
            GlobalConfig.config_directory(),
            "local_stores",
            str(artifact_store_uuid),
        )
        artifact_store = LocalArtifactStore(
            name="local_artifact_store",
            uuid=artifact_store_uuid,
            path=artifact_store_path,
        )

        metadata_store_path = os.path.join(artifact_store_path, "metadata.db")
        metadata_store = SQLiteMetadataStore(
            name="local_metadata_store", uri=metadata_store_path
        )

        return cls(
            name="local_stack",
            orchestrator=orchestrator,
            metadata_store=metadata_store,
            artifact_store=artifact_store,
        )

    @property
    def components(self) -> Dict[StackComponentType, "StackComponent"]:
        """All components of the stack."""
        return {
            component.type: component
            for component in [
                self.orchestrator,
                self.metadata_store,
                self.artifact_store,
                self.container_registry,
            ]
            if component is not None
        }

    @property
    def name(self) -> str:
        """The name of the stack."""
        return self._name

    @property
    def orchestrator(self) -> "BaseOrchestrator":
        """The orchestrator of the stack."""
        return self._orchestrator

    @property
    def metadata_store(self) -> "BaseMetadataStore":
        """The metadata store of the stack."""
        return self._metadata_store

    @property
    def artifact_store(self) -> "BaseArtifactStore":
        """The artifact store of the stack."""
        return self._artifact_store

    @property
    def container_registry(self) -> Optional["BaseContainerRegistry"]:
        """The container registry of the stack."""
        return self._container_registry

    # TODO [ENG-371]: Implement CLI method to generate configuration file from a
    #  stack's available runtime options
    @property
    def runtime_options(self) -> Dict[str, Any]:
        """Runtime options that are available to configure this stack.

        This method combines the available runtime options for all components
        of this stack. See `StackComponent.runtime_options()` for
        more information.
        """
        runtime_options: Dict[str, Any] = {}
        for component in self.components.values():
            duplicate_runtime_options = (
                runtime_options.keys() & component.runtime_options.keys()
            )
            if duplicate_runtime_options:
                logger.warning(
                    "Found duplicate runtime options %s.",
                    duplicate_runtime_options,
                )

            runtime_options.update(component.runtime_options)

        return runtime_options

    def dict(self) -> Dict[str, str]:
        """Converts the stack into a dictionary."""
        component_dict = {
            component_type.value: component.json(sort_keys=True)
            for component_type, component in self.components.items()
        }
        component_dict.update({"name": self.name})
        return component_dict

    def requirements(
        self,
        exclude_components: Optional[AbstractSet[StackComponentType]] = None,
    ) -> Set[str]:
        """Set of PyPI requirements for the stack.

        This method combines the requirements of all stack components (except
        the ones specified in `exclude_components`).

        Args:
            exclude_components: Set of component types for which the
                requirements should not be included in the output.
        """
        exclude_components = exclude_components or set()
        requirements = [
            component.requirements
            for component in self.components.values()
            if component.type not in exclude_components
        ]
        return set.union(*requirements) if requirements else set()

    def validate(self) -> None:
        """Checks whether the stack configuration is valid.

        To check if a stack configuration is valid, the following criteria must
        be met:
        - all components must support the execution mode (either local or
         remote execution) specified by the orchestrator of the stack
        - the `StackValidator` of each stack component has to validate the
         stack to make sure all the components are compatible with each other

        Raises:
             StackValidationError: If the stack configuration is not valid.
        """
        # TODO [ENG-372]: Differentiate between orchestrators running a pipeline
        #  locally and remotely (potentially using subclasses or an
        #  `orchestrator.mode` property?) and make sure all components support
        #  either local/remote execution

        for component in self.components.values():
            if component.validator:
                component.validator.validate(stack=self)

    def deploy_pipeline(
        self,
        pipeline: "BasePipeline",
        runtime_configuration: RuntimeConfiguration,
    ) -> Any:
        """Deploys a pipeline on this stack.

        Args:
            pipeline: The pipeline to deploy.
            runtime_configuration: Contains all the runtime configuration
                options specified for the pipeline run.

        Returns:
            The return value of the call to `orchestrator.run_pipeline(...)`.
        """
        for component in self.components.values():
            component.prepare_pipeline_deployment(
                pipeline=pipeline,
                stack=self,
                runtime_configuration=runtime_configuration,
            )

        for component in self.components.values():
            component.prepare_pipeline_run()

        runtime_configuration[
            RUN_NAME_OPTION_KEY
        ] = runtime_configuration.run_name or (
            f"{pipeline.name}-"
            f'{datetime.now().strftime("%d_%h_%y-%H_%M_%S_%f")}'
        )

        logger.info(
            "Using stack `%s` to run pipeline `%s`...",
            self.name,
            pipeline.name,
        )
        start_time = time.time()

        return_value = self.orchestrator.run_pipeline(
            pipeline, stack=self, runtime_configuration=runtime_configuration
        )

        run_duration = time.time() - start_time
        logger.info(
            "Pipeline run `%s` has finished in %s.",
            runtime_configuration.run_name,
            string_utils.get_human_readable_time(run_duration),
        )

        for component in self.components.values():
            component.cleanup_pipeline_run()

        return return_value

    # TODO [ENG-373]: Include provisioning status in CLI `zenml stack describe`
    #  and `zenml stack-component describe` commands
    @property
    def is_provisioned(self) -> bool:
        """If the stack provisioned resources to run locally."""
        return all(
            component.is_provisioned for component in self.components.values()
        )

    @property
    def is_running(self) -> bool:
        """If the stack is running locally."""
        return all(
            component.is_running for component in self.components.values()
        )

    def provision(self) -> None:
        """Provisions resources to run the stack locally.

        Raises:
            NotImplementedError: If any unprovisioned component does not
                implement provisioning.
        """
        logger.info("Provisioning resources for stack '%s'.", self.name)
        for component in self.components.values():
            if not component.is_provisioned:
                component.provision()
                logger.info("Provisioned resources for %s.", component)

    def deprovision(self) -> None:
        """Deprovisions all local resources of the stack.

        Raises:
            NotImplementedError: If any provisioned component does not
                implement deprovisioning.
        """
        logger.info("Deprovisioning resources for stack '%s'.", self.name)
        for component in self.components.values():
            if component.is_provisioned:
                try:
                    component.deprovision()
                    logger.info("Deprovisioned resources for %s.", component)
                except NotImplementedError as e:
                    logger.warning(e)

    def resume(self) -> None:
        """Resumes the provisioned local resources of the stack.

        Raises:
            ProvisioningError: If any stack component is missing provisioned
                resources.
        """
        logger.info("Resuming provisioned resources for stack '%s'.", self.name)
        for component in self.components.values():
            if component.is_running:
                # the component is already running, no need to resume anything
                pass
            elif component.is_provisioned:
                component.resume()
                logger.info("Resumed resources for %s.", component)
            else:
                raise ProvisioningError(
                    f"Unable to resume resources for {component}: No "
                    f"resources have been provisioned for this component."
                )

    def suspend(self) -> None:
        """Suspends the provisioned local resources of the stack."""
        logger.info(
            "Suspending provisioned resources for stack '%s'.", self.name
        )
        for component in self.components.values():
            if component.is_running:
                try:
                    component.suspend()
                    logger.info("Suspended resources for %s.", component)
                except NotImplementedError:
                    logger.warning(
                        "Suspending provisioned resources not implemented "
                        "for %s. Continuing without suspending resources...",
                        component,
                    )
artifact_store: BaseArtifactStore property readonly

The artifact store of the stack.

components: Dict[zenml.enums.StackComponentType, StackComponent] property readonly

All components of the stack.

container_registry: Optional[BaseContainerRegistry] property readonly

The container registry of the stack.

is_provisioned: bool property readonly

If the stack provisioned resources to run locally.

is_running: bool property readonly

If the stack is running locally.

metadata_store: BaseMetadataStore property readonly

The metadata store of the stack.

name: str property readonly

The name of the stack.

orchestrator: BaseOrchestrator property readonly

The orchestrator of the stack.

runtime_options: Dict[str, Any] property readonly

Runtime options that are available to configure this stack.

This method combines the available runtime options for all components of this stack. See StackComponent.runtime_options() for more information.

__init__(self, name, *, orchestrator, metadata_store, artifact_store, container_registry=None) special

Initializes and validates a stack instance.

Exceptions:

Type Description
StackValidationError

If the stack configuration is not valid.

Source code in zenml/stack/stack.py
def __init__(
    self,
    name: str,
    *,
    orchestrator: "BaseOrchestrator",
    metadata_store: "BaseMetadataStore",
    artifact_store: "BaseArtifactStore",
    container_registry: Optional["BaseContainerRegistry"] = None,
):
    """Initializes and validates a stack instance.

    Raises:
         StackValidationError: If the stack configuration is not valid.
    """
    self._name = name
    self._orchestrator = orchestrator
    self._metadata_store = metadata_store
    self._artifact_store = artifact_store
    self._container_registry = container_registry

    self.validate()
default_local_stack() classmethod

Creates a stack instance which is configured to run locally.

Source code in zenml/stack/stack.py
@classmethod
def default_local_stack(cls) -> "Stack":
    """Creates a stack instance which is configured to run locally."""
    from zenml.artifact_stores import LocalArtifactStore
    from zenml.metadata_stores import SQLiteMetadataStore
    from zenml.orchestrators import LocalOrchestrator

    orchestrator = LocalOrchestrator(name="local_orchestrator")

    artifact_store_uuid = uuid.uuid4()
    artifact_store_path = os.path.join(
        GlobalConfig.config_directory(),
        "local_stores",
        str(artifact_store_uuid),
    )
    artifact_store = LocalArtifactStore(
        name="local_artifact_store",
        uuid=artifact_store_uuid,
        path=artifact_store_path,
    )

    metadata_store_path = os.path.join(artifact_store_path, "metadata.db")
    metadata_store = SQLiteMetadataStore(
        name="local_metadata_store", uri=metadata_store_path
    )

    return cls(
        name="local_stack",
        orchestrator=orchestrator,
        metadata_store=metadata_store,
        artifact_store=artifact_store,
    )
deploy_pipeline(self, pipeline, runtime_configuration)

Deploys a pipeline on this stack.

Parameters:

Name Type Description Default
pipeline BasePipeline

The pipeline to deploy.

required
runtime_configuration RuntimeConfiguration

Contains all the runtime configuration options specified for the pipeline run.

required

Returns:

Type Description
Any

The return value of the call to orchestrator.run_pipeline(...).

Source code in zenml/stack/stack.py
def deploy_pipeline(
    self,
    pipeline: "BasePipeline",
    runtime_configuration: RuntimeConfiguration,
) -> Any:
    """Deploys a pipeline on this stack.

    Args:
        pipeline: The pipeline to deploy.
        runtime_configuration: Contains all the runtime configuration
            options specified for the pipeline run.

    Returns:
        The return value of the call to `orchestrator.run_pipeline(...)`.
    """
    for component in self.components.values():
        component.prepare_pipeline_deployment(
            pipeline=pipeline,
            stack=self,
            runtime_configuration=runtime_configuration,
        )

    for component in self.components.values():
        component.prepare_pipeline_run()

    runtime_configuration[
        RUN_NAME_OPTION_KEY
    ] = runtime_configuration.run_name or (
        f"{pipeline.name}-"
        f'{datetime.now().strftime("%d_%h_%y-%H_%M_%S_%f")}'
    )

    logger.info(
        "Using stack `%s` to run pipeline `%s`...",
        self.name,
        pipeline.name,
    )
    start_time = time.time()

    return_value = self.orchestrator.run_pipeline(
        pipeline, stack=self, runtime_configuration=runtime_configuration
    )

    run_duration = time.time() - start_time
    logger.info(
        "Pipeline run `%s` has finished in %s.",
        runtime_configuration.run_name,
        string_utils.get_human_readable_time(run_duration),
    )

    for component in self.components.values():
        component.cleanup_pipeline_run()

    return return_value
deprovision(self)

Deprovisions all local resources of the stack.

Exceptions:

Type Description
NotImplementedError

If any provisioned component does not implement deprovisioning.

Source code in zenml/stack/stack.py
def deprovision(self) -> None:
    """Deprovisions all local resources of the stack.

    Raises:
        NotImplementedError: If any provisioned component does not
            implement deprovisioning.
    """
    logger.info("Deprovisioning resources for stack '%s'.", self.name)
    for component in self.components.values():
        if component.is_provisioned:
            try:
                component.deprovision()
                logger.info("Deprovisioned resources for %s.", component)
            except NotImplementedError as e:
                logger.warning(e)
dict(self)

Converts the stack into a dictionary.

Source code in zenml/stack/stack.py
def dict(self) -> Dict[str, str]:
    """Converts the stack into a dictionary."""
    component_dict = {
        component_type.value: component.json(sort_keys=True)
        for component_type, component in self.components.items()
    }
    component_dict.update({"name": self.name})
    return component_dict
from_components(name, components) classmethod

Creates a stack instance from a dict of stack components.

Parameters:

Name Type Description Default
name str

The name of the stack.

required
components Dict[zenml.enums.StackComponentType, StackComponent]

The components of the stack.

required

Returns:

Type Description
Stack

A stack instance consisting of the given components.

Exceptions:

Type Description
TypeError

If a required component is missing or a component doesn't inherit from the expected base class.

Source code in zenml/stack/stack.py
@classmethod
def from_components(
    cls, name: str, components: Dict[StackComponentType, "StackComponent"]
) -> "Stack":
    """Creates a stack instance from a dict of stack components.

    Args:
        name: The name of the stack.
        components: The components of the stack.

    Returns:
        A stack instance consisting of the given components.

    Raises:
        TypeError: If a required component is missing or a component
            doesn't inherit from the expected base class.
    """
    from zenml.artifact_stores import BaseArtifactStore
    from zenml.container_registries import BaseContainerRegistry
    from zenml.metadata_stores import BaseMetadataStore
    from zenml.orchestrators import BaseOrchestrator

    def _raise_type_error(
        component: Optional["StackComponent"], expected_class: Type[Any]
    ) -> NoReturn:
        """Raises a TypeError that the component has an unexpected type."""
        raise TypeError(
            f"Unable to create stack: Wrong stack component type "
            f"`{component.__class__.__name__}` (expected: subclass "
            f"of `{expected_class.__name__}`)"
        )

    orchestrator = components.get(StackComponentType.ORCHESTRATOR)
    if not isinstance(orchestrator, BaseOrchestrator):
        _raise_type_error(orchestrator, BaseOrchestrator)

    metadata_store = components.get(StackComponentType.METADATA_STORE)
    if not isinstance(metadata_store, BaseMetadataStore):
        _raise_type_error(metadata_store, BaseMetadataStore)

    artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
    if not isinstance(artifact_store, BaseArtifactStore):
        _raise_type_error(artifact_store, BaseArtifactStore)

    container_registry = components.get(
        StackComponentType.CONTAINER_REGISTRY
    )
    if container_registry is not None and not isinstance(
        container_registry, BaseContainerRegistry
    ):
        _raise_type_error(container_registry, BaseContainerRegistry)

    return Stack(
        name=name,
        orchestrator=orchestrator,
        metadata_store=metadata_store,
        artifact_store=artifact_store,
        container_registry=container_registry,
    )
provision(self)

Provisions resources to run the stack locally.

Exceptions:

Type Description
NotImplementedError

If any unprovisioned component does not implement provisioning.

Source code in zenml/stack/stack.py
def provision(self) -> None:
    """Provisions resources to run the stack locally.

    Raises:
        NotImplementedError: If any unprovisioned component does not
            implement provisioning.
    """
    logger.info("Provisioning resources for stack '%s'.", self.name)
    for component in self.components.values():
        if not component.is_provisioned:
            component.provision()
            logger.info("Provisioned resources for %s.", component)
requirements(self, exclude_components=None)

Set of PyPI requirements for the stack.

This method combines the requirements of all stack components (except the ones specified in exclude_components).

Parameters:

Name Type Description Default
exclude_components Optional[AbstractSet[zenml.enums.StackComponentType]]

Set of component types for which the requirements should not be included in the output.

None
Source code in zenml/stack/stack.py
def requirements(
    self,
    exclude_components: Optional[AbstractSet[StackComponentType]] = None,
) -> Set[str]:
    """Set of PyPI requirements for the stack.

    This method combines the requirements of all stack components (except
    the ones specified in `exclude_components`).

    Args:
        exclude_components: Set of component types for which the
            requirements should not be included in the output.
    """
    exclude_components = exclude_components or set()
    requirements = [
        component.requirements
        for component in self.components.values()
        if component.type not in exclude_components
    ]
    return set.union(*requirements) if requirements else set()
resume(self)

Resumes the provisioned local resources of the stack.

Exceptions:

Type Description
ProvisioningError

If any stack component is missing provisioned resources.

Source code in zenml/stack/stack.py
def resume(self) -> None:
    """Resumes the provisioned local resources of the stack.

    Raises:
        ProvisioningError: If any stack component is missing provisioned
            resources.
    """
    logger.info("Resuming provisioned resources for stack '%s'.", self.name)
    for component in self.components.values():
        if component.is_running:
            # the component is already running, no need to resume anything
            pass
        elif component.is_provisioned:
            component.resume()
            logger.info("Resumed resources for %s.", component)
        else:
            raise ProvisioningError(
                f"Unable to resume resources for {component}: No "
                f"resources have been provisioned for this component."
            )
suspend(self)

Suspends the provisioned local resources of the stack.

Source code in zenml/stack/stack.py
def suspend(self) -> None:
    """Suspends the provisioned local resources of the stack."""
    logger.info(
        "Suspending provisioned resources for stack '%s'.", self.name
    )
    for component in self.components.values():
        if component.is_running:
            try:
                component.suspend()
                logger.info("Suspended resources for %s.", component)
            except NotImplementedError:
                logger.warning(
                    "Suspending provisioned resources not implemented "
                    "for %s. Continuing without suspending resources...",
                    component,
                )
validate(self)

Checks whether the stack configuration is valid.

To check if a stack configuration is valid, the following criteria must be met: - all components must support the execution mode (either local or remote execution) specified by the orchestrator of the stack - the StackValidator of each stack component has to validate the stack to make sure all the components are compatible with each other

Exceptions:

Type Description
StackValidationError

If the stack configuration is not valid.

Source code in zenml/stack/stack.py
def validate(self) -> None:
    """Checks whether the stack configuration is valid.

    To check if a stack configuration is valid, the following criteria must
    be met:
    - all components must support the execution mode (either local or
     remote execution) specified by the orchestrator of the stack
    - the `StackValidator` of each stack component has to validate the
     stack to make sure all the components are compatible with each other

    Raises:
         StackValidationError: If the stack configuration is not valid.
    """
    # TODO [ENG-372]: Differentiate between orchestrators running a pipeline
    #  locally and remotely (potentially using subclasses or an
    #  `orchestrator.mode` property?) and make sure all components support
    #  either local/remote execution

    for component in self.components.values():
        if component.validator:
            component.validator.validate(stack=self)

stack_component

StackComponent (BaseModel, ABC) pydantic-model

Abstract StackComponent class for all components of a ZenML stack.

Attributes:

Name Type Description
name str

The name of the component.

uuid UUID

Unique identifier of the component.

supports_local_execution bool

If the component supports running locally.

supports_remote_execution bool

If the component supports running remotely.

Source code in zenml/stack/stack_component.py
class StackComponent(BaseModel, ABC):
    """Abstract StackComponent class for all components of a ZenML stack.

    Attributes:
        name: The name of the component.
        uuid: Unique identifier of the component.
        supports_local_execution: If the component supports running locally.
        supports_remote_execution: If the component supports running remotely.
    """

    name: str
    uuid: UUID = Field(default_factory=uuid4)
    supports_local_execution: bool
    supports_remote_execution: bool

    @property
    @abstractmethod
    def type(self) -> StackComponentType:
        """The component type."""

    @property
    @abstractmethod
    def flavor(self) -> StackComponentFlavor:
        """The component flavor."""

    @property
    def log_file(self) -> Optional[str]:
        """Optional path to a log file for the stack component."""
        # TODO [ENG-136]: Add support for multiple log files for a stack
        #  component. E.g. let each component return a generator that yields
        #  logs instead of specifying a single file path.
        return None

    @property
    def runtime_options(self) -> Dict[str, Any]:
        """Runtime options that are available to configure this component.

        The items of the dictionary should map option names (which can be used
        to configure the option in the `RuntimeConfiguration`) to default
        values for the option (or `None` if there is no default value).
        """
        return {}

    @property
    def requirements(self) -> Set[str]:
        """Set of PyPI requirements for the component."""
        return set(get_requirements_for_module(self.__module__))

    def prepare_pipeline_deployment(
        self,
        pipeline: "BasePipeline",
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> None:
        """Prepares deploying the pipeline.

        This method gets called immediately before a pipeline is deployed.
        Subclasses should override it if they require runtime configuration
        options or if they need to run code before the pipeline deployment.

        Args:
            pipeline: The pipeline that will be deployed.
            stack: The stack on which the pipeline will be deployed.
            runtime_configuration: Contains all the runtime configuration
                options specified for the pipeline run.
        """

    def prepare_pipeline_run(self) -> None:
        """Prepares running the pipeline."""

    def cleanup_pipeline_run(self) -> None:
        """Cleans up resources after the pipeline run is finished."""

    @property
    def validator(self) -> Optional["StackValidator"]:
        """The optional validator of the stack component.

        This validator will be called each time a stack with the stack
        component is initialized. Subclasses should override this property
        and return a `StackValidator` that makes sure they're not included in
        any stack that they're not compatible with.
        """
        return None

    @property
    def is_provisioned(self) -> bool:
        """If the component provisioned resources to run locally."""
        return True

    @property
    def is_running(self) -> bool:
        """If the component is running locally."""
        return True

    def provision(self) -> None:
        """Provisions resources to run the component locally."""
        raise NotImplementedError(
            f"Provisioning local resources not implemented for {self}."
        )

    def deprovision(self) -> None:
        """Deprovisions all local resources of the component."""
        raise NotImplementedError(
            f"Deprovisioning local resource not implemented for {self}."
        )

    def resume(self) -> None:
        """Resumes the provisioned local resources of the component."""
        raise NotImplementedError(
            f"Resuming provisioned resources not implemented for {self}."
        )

    def suspend(self) -> None:
        """Suspends the provisioned local resources of the component."""
        raise NotImplementedError(
            f"Suspending provisioned resources not implemented for {self}."
        )

    def __repr__(self) -> str:
        """String representation of the stack component."""
        attribute_representation = ", ".join(
            f"{key}={value}" for key, value in self.dict().items()
        )
        return (
            f"{self.__class__.__qualname__}(type={self.type}, "
            f"flavor={self.flavor}, {attribute_representation})"
        )

    def __str__(self) -> str:
        """String representation of the stack component."""
        return self.__repr__()

    class Config:
        """Pydantic configuration class."""

        # public attributes are immutable
        allow_mutation = False
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True

        # exclude these two fields from being serialized
        fields = {
            "supports_local_execution": {"exclude": True},
            "supports_remote_execution": {"exclude": True},
        }
flavor: StackComponentFlavor property readonly

The component flavor.

is_provisioned: bool property readonly

If the component provisioned resources to run locally.

is_running: bool property readonly

If the component is running locally.

log_file: Optional[str] property readonly

Optional path to a log file for the stack component.

requirements: Set[str] property readonly

Set of PyPI requirements for the component.

runtime_options: Dict[str, Any] property readonly

Runtime options that are available to configure this component.

The items of the dictionary should map option names (which can be used to configure the option in the RuntimeConfiguration) to default values for the option (or None if there is no default value).

type: StackComponentType property readonly

The component type.

validator: Optional[StackValidator] property readonly

The optional validator of the stack component.

This validator will be called each time a stack with the stack component is initialized. Subclasses should override this property and return a StackValidator that makes sure they're not included in any stack that they're not compatible with.

Config

Pydantic configuration class.

Source code in zenml/stack/stack_component.py
class Config:
    """Pydantic configuration class."""

    # public attributes are immutable
    allow_mutation = False
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True

    # exclude these two fields from being serialized
    fields = {
        "supports_local_execution": {"exclude": True},
        "supports_remote_execution": {"exclude": True},
    }
__repr__(self) special

String representation of the stack component.

Source code in zenml/stack/stack_component.py
def __repr__(self) -> str:
    """String representation of the stack component."""
    attribute_representation = ", ".join(
        f"{key}={value}" for key, value in self.dict().items()
    )
    return (
        f"{self.__class__.__qualname__}(type={self.type}, "
        f"flavor={self.flavor}, {attribute_representation})"
    )
__str__(self) special

String representation of the stack component.

Source code in zenml/stack/stack_component.py
def __str__(self) -> str:
    """String representation of the stack component."""
    return self.__repr__()
cleanup_pipeline_run(self)

Cleans up resources after the pipeline run is finished.

Source code in zenml/stack/stack_component.py
def cleanup_pipeline_run(self) -> None:
    """Cleans up resources after the pipeline run is finished."""
deprovision(self)

Deprovisions all local resources of the component.

Source code in zenml/stack/stack_component.py
def deprovision(self) -> None:
    """Deprovisions all local resources of the component."""
    raise NotImplementedError(
        f"Deprovisioning local resource not implemented for {self}."
    )
prepare_pipeline_deployment(self, pipeline, stack, runtime_configuration)

Prepares deploying the pipeline.

This method gets called immediately before a pipeline is deployed. Subclasses should override it if they require runtime configuration options or if they need to run code before the pipeline deployment.

Parameters:

Name Type Description Default
pipeline BasePipeline

The pipeline that will be deployed.

required
stack Stack

The stack on which the pipeline will be deployed.

required
runtime_configuration RuntimeConfiguration

Contains all the runtime configuration options specified for the pipeline run.

required
Source code in zenml/stack/stack_component.py
def prepare_pipeline_deployment(
    self,
    pipeline: "BasePipeline",
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> None:
    """Prepares deploying the pipeline.

    This method gets called immediately before a pipeline is deployed.
    Subclasses should override it if they require runtime configuration
    options or if they need to run code before the pipeline deployment.

    Args:
        pipeline: The pipeline that will be deployed.
        stack: The stack on which the pipeline will be deployed.
        runtime_configuration: Contains all the runtime configuration
            options specified for the pipeline run.
    """
prepare_pipeline_run(self)

Prepares running the pipeline.

Source code in zenml/stack/stack_component.py
def prepare_pipeline_run(self) -> None:
    """Prepares running the pipeline."""
provision(self)

Provisions resources to run the component locally.

Source code in zenml/stack/stack_component.py
def provision(self) -> None:
    """Provisions resources to run the component locally."""
    raise NotImplementedError(
        f"Provisioning local resources not implemented for {self}."
    )
resume(self)

Resumes the provisioned local resources of the component.

Source code in zenml/stack/stack_component.py
def resume(self) -> None:
    """Resumes the provisioned local resources of the component."""
    raise NotImplementedError(
        f"Resuming provisioned resources not implemented for {self}."
    )
suspend(self)

Suspends the provisioned local resources of the component.

Source code in zenml/stack/stack_component.py
def suspend(self) -> None:
    """Suspends the provisioned local resources of the component."""
    raise NotImplementedError(
        f"Suspending provisioned resources not implemented for {self}."
    )

stack_component_class_registry

StackComponentClassRegistry

Registry for stack component classes.

All stack component classes must be registered here so they can be instantiated from the component type and flavor specified inside the ZenML repository configuration.

Source code in zenml/stack/stack_component_class_registry.py
class StackComponentClassRegistry:
    """Registry for stack component classes.

    All stack component classes must be registered here so they can be
    instantiated from the component type and flavor specified inside the
    ZenML repository configuration.
    """

    component_classes: ClassVar[
        DefaultDict[StackComponentType, Dict[str, Type[StackComponent]]]
    ] = defaultdict(dict)

    @classmethod
    def register_class(
        cls,
        component_type: StackComponentType,
        component_flavor: StackComponentFlavor,
        component_class: Type[StackComponent],
    ) -> None:
        """Registers a stack component class.

        Args:
            component_type: The type of the component class to register.
            component_flavor: The flavor of the component class to register.
            component_class: The component class to register.
        """
        component_flavor = component_flavor.value
        flavors = cls.component_classes[component_type]
        if component_flavor in flavors:
            logger.warning(
                "Overwriting previously registered stack component class `%s` "
                "for type '%s' and flavor '%s'.",
                flavors[component_flavor].__class__.__name__,
                component_type.value,
                component_flavor,
            )

        flavors[component_flavor] = component_class
        logger.debug(
            "Registered stack component class for type '%s' and flavor '%s'.",
            component_type.value,
            component_flavor,
        )

    @classmethod
    def get_class(
        cls,
        component_type: StackComponentType,
        component_flavor: Union[StackComponentFlavor, str],
    ) -> Type[StackComponent]:
        """Returns the stack component class for the given type and flavor.

        Args:
            component_type: The type of the component class to return.
            component_flavor: The flavor of the component class to return.

        Raises:
            KeyError: If no component class is registered for the given type
                and flavor.
        """
        # TODO [ENG-374]: Think about activating the integrations here to make
        #  sure all potential StackComponent classes are registered
        if isinstance(component_flavor, StackComponentFlavor):
            component_flavor = component_flavor.value

        available_flavors = cls.component_classes[component_type]
        try:
            return available_flavors[component_flavor]
        except KeyError:
            raise KeyError(
                f"No stack component class found for type {component_type} "
                f"and flavor {component_flavor}. Registered flavors for this "
                f"type: {set(available_flavors)}. If your stack component "
                f"class is part of a ZenML integration, make sure to active "
                f"them by calling "
                f"`IntegrationRegistry.activate_integrations()`."
            ) from None
get_class(component_type, component_flavor) classmethod

Returns the stack component class for the given type and flavor.

Parameters:

Name Type Description Default
component_type StackComponentType

The type of the component class to return.

required
component_flavor Union[zenml.enums.StackComponentFlavor, str]

The flavor of the component class to return.

required

Exceptions:

Type Description
KeyError

If no component class is registered for the given type and flavor.

Source code in zenml/stack/stack_component_class_registry.py
@classmethod
def get_class(
    cls,
    component_type: StackComponentType,
    component_flavor: Union[StackComponentFlavor, str],
) -> Type[StackComponent]:
    """Returns the stack component class for the given type and flavor.

    Args:
        component_type: The type of the component class to return.
        component_flavor: The flavor of the component class to return.

    Raises:
        KeyError: If no component class is registered for the given type
            and flavor.
    """
    # TODO [ENG-374]: Think about activating the integrations here to make
    #  sure all potential StackComponent classes are registered
    if isinstance(component_flavor, StackComponentFlavor):
        component_flavor = component_flavor.value

    available_flavors = cls.component_classes[component_type]
    try:
        return available_flavors[component_flavor]
    except KeyError:
        raise KeyError(
            f"No stack component class found for type {component_type} "
            f"and flavor {component_flavor}. Registered flavors for this "
            f"type: {set(available_flavors)}. If your stack component "
            f"class is part of a ZenML integration, make sure to active "
            f"them by calling "
            f"`IntegrationRegistry.activate_integrations()`."
        ) from None
register_class(component_type, component_flavor, component_class) classmethod

Registers a stack component class.

Parameters:

Name Type Description Default
component_type StackComponentType

The type of the component class to register.

required
component_flavor StackComponentFlavor

The flavor of the component class to register.

required
component_class Type[zenml.stack.stack_component.StackComponent]

The component class to register.

required
Source code in zenml/stack/stack_component_class_registry.py
@classmethod
def register_class(
    cls,
    component_type: StackComponentType,
    component_flavor: StackComponentFlavor,
    component_class: Type[StackComponent],
) -> None:
    """Registers a stack component class.

    Args:
        component_type: The type of the component class to register.
        component_flavor: The flavor of the component class to register.
        component_class: The component class to register.
    """
    component_flavor = component_flavor.value
    flavors = cls.component_classes[component_type]
    if component_flavor in flavors:
        logger.warning(
            "Overwriting previously registered stack component class `%s` "
            "for type '%s' and flavor '%s'.",
            flavors[component_flavor].__class__.__name__,
            component_type.value,
            component_flavor,
        )

    flavors[component_flavor] = component_class
    logger.debug(
        "Registered stack component class for type '%s' and flavor '%s'.",
        component_type.value,
        component_flavor,
    )

register_stack_component_class(component_type, component_flavor)

Parametrized decorator function to register stack component classes.

Parameters:

Name Type Description Default
component_type StackComponentType

The type of the component class to register.

required
component_flavor StackComponentFlavor

The flavor of the component class to register.

required

Returns:

Type Description
Callable[[Type[~C]], Type[~C]]

A decorator function that registers and returns the decorated stack component class.

Source code in zenml/stack/stack_component_class_registry.py
def register_stack_component_class(
    component_type: StackComponentType, component_flavor: StackComponentFlavor
) -> Callable[[Type[C]], Type[C]]:
    """Parametrized decorator function to register stack component classes.

    Args:
        component_type: The type of the component class to register.
        component_flavor: The flavor of the component class to register.

    Returns:
        A decorator function that registers and returns the decorated stack
        component class.
    """

    def decorator_function(cls: Type[C]) -> Type[C]:
        """Registers the stack component class and returns it unmodified."""
        StackComponentClassRegistry.register_class(
            component_type=component_type,
            component_flavor=component_flavor,
            component_class=cls,
        )
        return cls

    return decorator_function

stack_validator

StackValidator

A StackValidator is used to validate a stack configuration.

Each StackComponent can provide a StackValidator to make sure it is compatible with all components of the stack. The KubeflowOrchestrator for example will always require the stack to have a container registry in order to push the docker images that are required to run a pipeline in Kubeflow Pipelines.

Source code in zenml/stack/stack_validator.py
class StackValidator:
    """A `StackValidator` is used to validate a stack configuration.

    Each `StackComponent` can provide a `StackValidator` to make sure it is
    compatible with all components of the stack. The `KubeflowOrchestrator`
    for example will always require the stack to have a container registry
    in order to push the docker images that are required to run a pipeline
    in Kubeflow Pipelines.
    """

    def __init__(
        self,
        required_components: Optional[AbstractSet[StackComponentType]] = None,
        custom_validation_function: Optional[Callable[["Stack"], bool]] = None,
    ):
        """Initializes a `StackValidator` instance.

        Args:
            required_components: Optional set of stack components that must
                exist in the stack.
            custom_validation_function: Optional function that returns whether
                a stack is valid.
        """
        self._required_components = required_components or set()
        self._custom_validation_function = custom_validation_function

    def validate(self, stack: "Stack") -> None:
        """Validates the given stack.

        Checks if the stack contains all the required components and passes
        the custom validation function of the validator.

        Raises:
            StackValidationError: If the stack does not meet all the
                validation criteria.
        """
        missing_components = self._required_components - set(stack.components)
        if missing_components:
            raise StackValidationError(
                f"Missing stack components {missing_components} for "
                f"stack: {stack}"
            )

        if (
            self._custom_validation_function
            and not self._custom_validation_function(stack)
        ):
            raise StackValidationError(
                f"Custom validation function failed to validate "
                f"stack: {stack}"
            )
__init__(self, required_components=None, custom_validation_function=None) special

Initializes a StackValidator instance.

Parameters:

Name Type Description Default
required_components Optional[AbstractSet[zenml.enums.StackComponentType]]

Optional set of stack components that must exist in the stack.

None
custom_validation_function Optional[Callable[[Stack], bool]]

Optional function that returns whether a stack is valid.

None
Source code in zenml/stack/stack_validator.py
def __init__(
    self,
    required_components: Optional[AbstractSet[StackComponentType]] = None,
    custom_validation_function: Optional[Callable[["Stack"], bool]] = None,
):
    """Initializes a `StackValidator` instance.

    Args:
        required_components: Optional set of stack components that must
            exist in the stack.
        custom_validation_function: Optional function that returns whether
            a stack is valid.
    """
    self._required_components = required_components or set()
    self._custom_validation_function = custom_validation_function
validate(self, stack)

Validates the given stack.

Checks if the stack contains all the required components and passes the custom validation function of the validator.

Exceptions:

Type Description
StackValidationError

If the stack does not meet all the validation criteria.

Source code in zenml/stack/stack_validator.py
def validate(self, stack: "Stack") -> None:
    """Validates the given stack.

    Checks if the stack contains all the required components and passes
    the custom validation function of the validator.

    Raises:
        StackValidationError: If the stack does not meet all the
            validation criteria.
    """
    missing_components = self._required_components - set(stack.components)
    if missing_components:
        raise StackValidationError(
            f"Missing stack components {missing_components} for "
            f"stack: {stack}"
        )

    if (
        self._custom_validation_function
        and not self._custom_validation_function(stack)
    ):
        raise StackValidationError(
            f"Custom validation function failed to validate "
            f"stack: {stack}"
        )