Skip to content

Stack

zenml.stack special

Initialization of the ZenML Stack.

The stack is essentially all the configuration for the infrastructure of your MLOps platform.

A stack is made up of multiple components. Some examples are:

  • An Artifact Store
  • A Metadata Store
  • An Orchestrator
  • A Step Operator (Optional)
  • A Container Registry (Optional)

authentication_mixin

Stack component mixin for authentication.

AuthenticationMixin (BaseModel) pydantic-model

Stack component mixin for authentication.

Attributes:

Name Type Description
authentication_secret Optional[str]

Name of the secret that stores the authentication credentials.

Source code in zenml/stack/authentication_mixin.py
class AuthenticationMixin(BaseModel):
    """Stack component mixin for authentication.

    Attributes:
        authentication_secret: Name of the secret that stores the
            authentication credentials.
    """

    authentication_secret: Optional[str] = None

    def get_authentication_secret(
        self, expected_schema_type: Type[T]
    ) -> Optional[T]:
        """Gets the secret referred to by the authentication secret attribute.

        Args:
            expected_schema_type: The expected secret schema class.

        Returns:
            The secret object if the `authentication_secret` attribute is set,
            `None` otherwise.

        Raises:
            RuntimeError: If no secrets manager exists in the active stack.
            TypeError: If the secret is not of the expected schema type.
        """
        if not self.authentication_secret:
            return None

        active_stack = Repository(skip_repository_check=True).active_stack  # type: ignore[call-arg]
        secrets_manager = active_stack.secrets_manager
        if not secrets_manager:
            raise RuntimeError(
                f"Unable to retrieve secret '{self.authentication_secret}' "
                "because the active stack does not have a secrets manager."
            )

        secret = secrets_manager.get_secret(self.authentication_secret)

        if not isinstance(secret, expected_schema_type):
            raise TypeError(
                f"Authentication secret has type {secret.TYPE} but a secret of "
                f"type {expected_schema_type.TYPE} was expected. To solve this "
                f"issue, register a secret with name "
                f"{self.authentication_secret} of type "
                f"{expected_schema_type.TYPE} using the following command: \n "
                f"`zenml secrets-manager secret register {self.authentication_secret} "
                f"--schema={expected_schema_type.TYPE} ...`"
            )

        return secret
get_authentication_secret(self, expected_schema_type)

Gets the secret referred to by the authentication secret attribute.

Parameters:

Name Type Description Default
expected_schema_type Type[~T]

The expected secret schema class.

required

Returns:

Type Description
Optional[~T]

The secret object if the authentication_secret attribute is set, None otherwise.

Exceptions:

Type Description
RuntimeError

If no secrets manager exists in the active stack.

TypeError

If the secret is not of the expected schema type.

Source code in zenml/stack/authentication_mixin.py
def get_authentication_secret(
    self, expected_schema_type: Type[T]
) -> Optional[T]:
    """Gets the secret referred to by the authentication secret attribute.

    Args:
        expected_schema_type: The expected secret schema class.

    Returns:
        The secret object if the `authentication_secret` attribute is set,
        `None` otherwise.

    Raises:
        RuntimeError: If no secrets manager exists in the active stack.
        TypeError: If the secret is not of the expected schema type.
    """
    if not self.authentication_secret:
        return None

    active_stack = Repository(skip_repository_check=True).active_stack  # type: ignore[call-arg]
    secrets_manager = active_stack.secrets_manager
    if not secrets_manager:
        raise RuntimeError(
            f"Unable to retrieve secret '{self.authentication_secret}' "
            "because the active stack does not have a secrets manager."
        )

    secret = secrets_manager.get_secret(self.authentication_secret)

    if not isinstance(secret, expected_schema_type):
        raise TypeError(
            f"Authentication secret has type {secret.TYPE} but a secret of "
            f"type {expected_schema_type.TYPE} was expected. To solve this "
            f"issue, register a secret with name "
            f"{self.authentication_secret} of type "
            f"{expected_schema_type.TYPE} using the following command: \n "
            f"`zenml secrets-manager secret register {self.authentication_secret} "
            f"--schema={expected_schema_type.TYPE} ...`"
        )

    return secret

flavor_registry

Implementation of the ZenML flavor registry.

FlavorRegistry

Registry for stack component flavors.

The flavors defined by ZenML must be registered here.

Source code in zenml/stack/flavor_registry.py
class FlavorRegistry:
    """Registry for stack component flavors.

    The flavors defined by ZenML must be registered here.
    """

    def __init__(self) -> None:
        """Initialization of the flavors."""
        self._flavors: DefaultDict[
            StackComponentType, Dict[str, FlavorWrapper]
        ] = defaultdict(dict)

        self.register_default_flavors()
        self.register_integration_flavors()

    def register_default_flavors(self) -> None:
        """Registers the default built-in flavors."""
        from zenml.artifact_stores import LocalArtifactStore
        from zenml.container_registries import (
            AzureContainerRegistry,
            DefaultContainerRegistry,
            DockerHubContainerRegistry,
            GCPContainerRegistry,
            GitHubContainerRegistry,
        )
        from zenml.metadata_stores import (
            MySQLMetadataStore,
            SQLiteMetadataStore,
        )
        from zenml.orchestrators import LocalOrchestrator
        from zenml.secrets_managers import LocalSecretsManager

        default_flavors = [
            LocalOrchestrator,
            SQLiteMetadataStore,
            MySQLMetadataStore,
            LocalArtifactStore,
            DefaultContainerRegistry,
            AzureContainerRegistry,
            DockerHubContainerRegistry,
            GCPContainerRegistry,
            GitHubContainerRegistry,
            LocalSecretsManager,
        ]
        for flavor in default_flavors:
            self._register_flavor(
                FlavorWrapper(
                    name=flavor.FLAVOR,  # type: ignore[attr-defined]
                    type=flavor.TYPE,  # type: ignore[attr-defined]
                    source=flavor.__module__ + "." + flavor.__name__,
                    integration="built-in",
                )
            )

    def register_integration_flavors(self) -> None:
        """Registers the flavors implemented by integrations."""
        from zenml.integrations.registry import integration_registry

        for integration in integration_registry.integrations.values():
            integrated_flavors = integration.flavors()
            if integrated_flavors:
                for flavor in integrated_flavors:
                    self._register_flavor(flavor)

    def _register_flavor(
        self,
        flavor: FlavorWrapper,
    ) -> None:
        """Registers a stack component flavor.

        Args:
            flavor: The flavor to register.

        Raises:
            KeyError: If the flavor is already registered.
        """
        flavors = self._flavors[flavor.type]

        if flavor.name in flavors:
            raise KeyError(
                f"There is already a {flavor.type} with the flavor "
                f"`{flavor.name}`. Please select another name for the flavor."
            )

        flavors[flavor.name] = flavor
        logger.debug(
            f"Registered flavor for '{flavor.name}' and type '{flavor.type}'.",
        )

    def get_flavors_by_type(
        self, component_type: StackComponentType
    ) -> Dict[str, FlavorWrapper]:
        """Return the list of flavors with given type.

        Args:
            component_type: The type of the stack component.

        Returns:
            The list of flavors with the given type.
        """
        return self._flavors[component_type]
__init__(self) special

Initialization of the flavors.

Source code in zenml/stack/flavor_registry.py
def __init__(self) -> None:
    """Initialization of the flavors."""
    self._flavors: DefaultDict[
        StackComponentType, Dict[str, FlavorWrapper]
    ] = defaultdict(dict)

    self.register_default_flavors()
    self.register_integration_flavors()
get_flavors_by_type(self, component_type)

Return the list of flavors with given type.

Parameters:

Name Type Description Default
component_type StackComponentType

The type of the stack component.

required

Returns:

Type Description
Dict[str, zenml.zen_stores.models.flavor_wrapper.FlavorWrapper]

The list of flavors with the given type.

Source code in zenml/stack/flavor_registry.py
def get_flavors_by_type(
    self, component_type: StackComponentType
) -> Dict[str, FlavorWrapper]:
    """Return the list of flavors with given type.

    Args:
        component_type: The type of the stack component.

    Returns:
        The list of flavors with the given type.
    """
    return self._flavors[component_type]
register_default_flavors(self)

Registers the default built-in flavors.

Source code in zenml/stack/flavor_registry.py
def register_default_flavors(self) -> None:
    """Registers the default built-in flavors."""
    from zenml.artifact_stores import LocalArtifactStore
    from zenml.container_registries import (
        AzureContainerRegistry,
        DefaultContainerRegistry,
        DockerHubContainerRegistry,
        GCPContainerRegistry,
        GitHubContainerRegistry,
    )
    from zenml.metadata_stores import (
        MySQLMetadataStore,
        SQLiteMetadataStore,
    )
    from zenml.orchestrators import LocalOrchestrator
    from zenml.secrets_managers import LocalSecretsManager

    default_flavors = [
        LocalOrchestrator,
        SQLiteMetadataStore,
        MySQLMetadataStore,
        LocalArtifactStore,
        DefaultContainerRegistry,
        AzureContainerRegistry,
        DockerHubContainerRegistry,
        GCPContainerRegistry,
        GitHubContainerRegistry,
        LocalSecretsManager,
    ]
    for flavor in default_flavors:
        self._register_flavor(
            FlavorWrapper(
                name=flavor.FLAVOR,  # type: ignore[attr-defined]
                type=flavor.TYPE,  # type: ignore[attr-defined]
                source=flavor.__module__ + "." + flavor.__name__,
                integration="built-in",
            )
        )
register_integration_flavors(self)

Registers the flavors implemented by integrations.

Source code in zenml/stack/flavor_registry.py
def register_integration_flavors(self) -> None:
    """Registers the flavors implemented by integrations."""
    from zenml.integrations.registry import integration_registry

    for integration in integration_registry.integrations.values():
        integrated_flavors = integration.flavors()
        if integrated_flavors:
            for flavor in integrated_flavors:
                self._register_flavor(flavor)

stack

Implementation of the ZenML Stack class.

Stack

ZenML stack class.

A ZenML stack is a collection of multiple stack components that are required to run ZenML pipelines. Some of these components (orchestrator, metadata store and artifact store) are required to run any kind of pipeline, other components like the container registry are only required if other stack components depend on them.

Source code in zenml/stack/stack.py
class Stack:
    """ZenML stack class.

    A ZenML stack is a collection of multiple stack components that are
    required to run ZenML pipelines. Some of these components (orchestrator,
    metadata store and artifact store) are required to run any kind of
    pipeline, other components like the container registry are only required
    if other stack components depend on them.
    """

    def __init__(
        self,
        name: str,
        *,
        orchestrator: "BaseOrchestrator",
        metadata_store: "BaseMetadataStore",
        artifact_store: "BaseArtifactStore",
        container_registry: Optional["BaseContainerRegistry"] = None,
        secrets_manager: Optional["BaseSecretsManager"] = None,
        step_operator: Optional["BaseStepOperator"] = None,
        feature_store: Optional["BaseFeatureStore"] = None,
        model_deployer: Optional["BaseModelDeployer"] = None,
        experiment_tracker: Optional["BaseExperimentTracker"] = None,
        alerter: Optional["BaseAlerter"] = None,
        annotator: Optional["BaseAnnotator"] = None,
        data_validator: Optional["BaseDataValidator"] = None,
    ):
        """Initializes and validates a stack instance.

        # noqa: DAR402

        Args:
            name: Name of the stack.
            orchestrator: Orchestrator component of the stack.
            metadata_store: Metadata store component of the stack.
            artifact_store: Artifact store component of the stack.
            container_registry: Container registry component of the stack.
            secrets_manager: Secrets manager component of the stack.
            step_operator: Step operator component of the stack.
            feature_store: Feature store component of the stack.
            model_deployer: Model deployer component of the stack.
            experiment_tracker: Experiment tracker component of the stack.
            alerter: Alerter component of the stack.
            annotator: Annotator component of the stack.
            data_validator: Data validator component of the stack.

        Raises:
            StackValidationError: If the stack configuration is not valid.
        """
        self._name = name
        self._orchestrator = orchestrator
        self._metadata_store = metadata_store
        self._artifact_store = artifact_store
        self._container_registry = container_registry
        self._step_operator = step_operator
        self._secrets_manager = secrets_manager
        self._feature_store = feature_store
        self._model_deployer = model_deployer
        self._experiment_tracker = experiment_tracker
        self._alerter = alerter
        self._annotator = annotator
        self._data_validator = data_validator

    @classmethod
    def from_components(
        cls, name: str, components: Dict[StackComponentType, "StackComponent"]
    ) -> "Stack":
        """Creates a stack instance from a dict of stack components.

        # noqa: DAR402

        Args:
            name: The name of the stack.
            components: The components of the stack.

        Returns:
            A stack instance consisting of the given components.

        Raises:
            TypeError: If a required component is missing or a component
                doesn't inherit from the expected base class.
        """
        from zenml.alerter import BaseAlerter
        from zenml.annotators import BaseAnnotator
        from zenml.artifact_stores import BaseArtifactStore
        from zenml.container_registries import BaseContainerRegistry
        from zenml.data_validators import BaseDataValidator
        from zenml.experiment_trackers import BaseExperimentTracker
        from zenml.feature_stores import BaseFeatureStore
        from zenml.metadata_stores import BaseMetadataStore
        from zenml.model_deployers import BaseModelDeployer
        from zenml.orchestrators import BaseOrchestrator
        from zenml.secrets_managers import BaseSecretsManager
        from zenml.step_operators import BaseStepOperator

        def _raise_type_error(
            component: Optional["StackComponent"], expected_class: Type[Any]
        ) -> NoReturn:
            """Raises a TypeError that the component has an unexpected type.

            Args:
                component: The component that has an unexpected type.
                expected_class: The expected type of the component.

            Raises:
                TypeError: If the component has an unexpected type.
            """
            raise TypeError(
                f"Unable to create stack: Wrong stack component type "
                f"`{component.__class__.__name__}` (expected: subclass "
                f"of `{expected_class.__name__}`)"
            )

        orchestrator = components.get(StackComponentType.ORCHESTRATOR)
        if not isinstance(orchestrator, BaseOrchestrator):
            _raise_type_error(orchestrator, BaseOrchestrator)

        metadata_store = components.get(StackComponentType.METADATA_STORE)
        if not isinstance(metadata_store, BaseMetadataStore):
            _raise_type_error(metadata_store, BaseMetadataStore)

        artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
        if not isinstance(artifact_store, BaseArtifactStore):
            _raise_type_error(artifact_store, BaseArtifactStore)

        container_registry = components.get(
            StackComponentType.CONTAINER_REGISTRY
        )
        if container_registry is not None and not isinstance(
            container_registry, BaseContainerRegistry
        ):
            _raise_type_error(container_registry, BaseContainerRegistry)

        secrets_manager = components.get(StackComponentType.SECRETS_MANAGER)
        if secrets_manager is not None and not isinstance(
            secrets_manager, BaseSecretsManager
        ):
            _raise_type_error(secrets_manager, BaseSecretsManager)

        step_operator = components.get(StackComponentType.STEP_OPERATOR)
        if step_operator is not None and not isinstance(
            step_operator, BaseStepOperator
        ):
            _raise_type_error(step_operator, BaseStepOperator)

        feature_store = components.get(StackComponentType.FEATURE_STORE)
        if feature_store is not None and not isinstance(
            feature_store, BaseFeatureStore
        ):
            _raise_type_error(feature_store, BaseFeatureStore)

        model_deployer = components.get(StackComponentType.MODEL_DEPLOYER)
        if model_deployer is not None and not isinstance(
            model_deployer, BaseModelDeployer
        ):
            _raise_type_error(model_deployer, BaseModelDeployer)

        experiment_tracker = components.get(
            StackComponentType.EXPERIMENT_TRACKER
        )
        if experiment_tracker is not None and not isinstance(
            experiment_tracker, BaseExperimentTracker
        ):
            _raise_type_error(experiment_tracker, BaseExperimentTracker)

        alerter = components.get(StackComponentType.ALERTER)
        if alerter is not None and not isinstance(alerter, BaseAlerter):
            _raise_type_error(alerter, BaseAlerter)

        annotator = components.get(StackComponentType.ANNOTATOR)
        if annotator is not None and not isinstance(annotator, BaseAnnotator):
            _raise_type_error(annotator, BaseAnnotator)

        data_validator = components.get(StackComponentType.DATA_VALIDATOR)
        if data_validator is not None and not isinstance(
            data_validator, BaseDataValidator
        ):
            _raise_type_error(data_validator, BaseDataValidator)

        return Stack(
            name=name,
            orchestrator=orchestrator,
            metadata_store=metadata_store,
            artifact_store=artifact_store,
            container_registry=container_registry,
            secrets_manager=secrets_manager,
            step_operator=step_operator,
            feature_store=feature_store,
            model_deployer=model_deployer,
            experiment_tracker=experiment_tracker,
            alerter=alerter,
            annotator=annotator,
            data_validator=data_validator,
        )

    @classmethod
    def default_local_stack(cls) -> "Stack":
        """Creates a stack instance which is configured to run locally.

        Returns:
            A stack instance configured to run locally.
        """
        from zenml.artifact_stores import LocalArtifactStore
        from zenml.metadata_stores import SQLiteMetadataStore
        from zenml.orchestrators import LocalOrchestrator
        from zenml.stack.stack_component import uuid_factory

        orchestrator = LocalOrchestrator(name="default")

        artifact_store_uuid = uuid_factory()
        artifact_store_path = os.path.join(
            GlobalConfiguration().config_directory,
            "local_stores",
            str(artifact_store_uuid),
        )
        io_utils.create_dir_recursive_if_not_exists(artifact_store_path)
        artifact_store = LocalArtifactStore(
            name="default",
            uuid=artifact_store_uuid,
            path=artifact_store_path,
        )

        metadata_store_path = os.path.join(artifact_store_path, "metadata.db")
        metadata_store = SQLiteMetadataStore(
            name="default", uri=metadata_store_path
        )

        return cls(
            name="default",
            orchestrator=orchestrator,
            metadata_store=metadata_store,
            artifact_store=artifact_store,
        )

    @property
    def components(self) -> Dict[StackComponentType, "StackComponent"]:
        """All components of the stack.

        Returns:
            A dictionary of all components of the stack.
        """
        return {
            component.TYPE: component
            for component in [
                self.orchestrator,
                self.metadata_store,
                self.artifact_store,
                self.container_registry,
                self.secrets_manager,
                self.step_operator,
                self.feature_store,
                self.model_deployer,
                self.experiment_tracker,
                self.alerter,
                self.annotator,
                self.data_validator,
            ]
            if component is not None
        }

    @property
    def name(self) -> str:
        """The name of the stack.

        Returns:
            str: The name of the stack.
        """
        return self._name

    @property
    def orchestrator(self) -> "BaseOrchestrator":
        """The orchestrator of the stack.

        Returns:
            The orchestrator of the stack.
        """
        return self._orchestrator

    @property
    def metadata_store(self) -> "BaseMetadataStore":
        """The metadata store of the stack.

        Returns:
            The metadata store of the stack.
        """
        return self._metadata_store

    @property
    def artifact_store(self) -> "BaseArtifactStore":
        """The artifact store of the stack.

        Returns:
            The artifact store of the stack.
        """
        return self._artifact_store

    @property
    def container_registry(self) -> Optional["BaseContainerRegistry"]:
        """The container registry of the stack.

        Returns:
            The container registry of the stack or None if the stack does not
            have a container registry.
        """
        return self._container_registry

    @property
    def secrets_manager(self) -> Optional["BaseSecretsManager"]:
        """The secrets manager of the stack.

        Returns:
            The secrets manager of the stack.
        """
        return self._secrets_manager

    @property
    def step_operator(self) -> Optional["BaseStepOperator"]:
        """The step operator of the stack.

        Returns:
            The step operator of the stack.
        """
        return self._step_operator

    @property
    def feature_store(self) -> Optional["BaseFeatureStore"]:
        """The feature store of the stack.

        Returns:
            The feature store of the stack.
        """
        return self._feature_store

    @property
    def model_deployer(self) -> Optional["BaseModelDeployer"]:
        """The model deployer of the stack.

        Returns:
            The model deployer of the stack.
        """
        return self._model_deployer

    @property
    def experiment_tracker(self) -> Optional["BaseExperimentTracker"]:
        """The experiment tracker of the stack.

        Returns:
            The experiment tracker of the stack.
        """
        return self._experiment_tracker

    @property
    def alerter(self) -> Optional["BaseAlerter"]:
        """The alerter of the stack.

        Returns:
            The alerter of the stack.
        """
        return self._alerter

    @property
    def annotator(self) -> Optional["BaseAnnotator"]:
        """The annotator of the stack.

        Returns:
            The annotator of the stack.
        """
        return self._annotator

    @property
    def data_validator(self) -> Optional["BaseDataValidator"]:
        """The data validator of the stack.

        Returns:
            The data validator of the stack.
        """
        return self._data_validator

    @property
    def runtime_options(self) -> Dict[str, Any]:
        """Runtime options that are available to configure this stack.

        This method combines the available runtime options for all components
        of this stack. See `StackComponent.runtime_options()` for
        more information.

        Returns:
            A dictionary of runtime options.
        """
        runtime_options: Dict[str, Any] = {}
        for component in self.components.values():
            duplicate_runtime_options = (
                runtime_options.keys() & component.runtime_options.keys()
            )
            if duplicate_runtime_options:
                logger.warning(
                    "Found duplicate runtime options %s.",
                    duplicate_runtime_options,
                )

            runtime_options.update(component.runtime_options)

        return runtime_options

    def dict(self) -> Dict[str, str]:
        """Converts the stack into a dictionary.

        Returns:
            A dictionary containing the stack components.
        """
        component_dict = {
            component_type.value: component.json(sort_keys=True)
            for component_type, component in self.components.items()
        }
        component_dict.update({"name": self.name})
        return component_dict

    def requirements(
        self,
        exclude_components: Optional[AbstractSet[StackComponentType]] = None,
    ) -> Set[str]:
        """Set of PyPI requirements for the stack.

        This method combines the requirements of all stack components (except
        the ones specified in `exclude_components`).

        Args:
            exclude_components: Set of component types for which the
                requirements should not be included in the output.

        Returns:
            Set of PyPI requirements.
        """
        exclude_components = exclude_components or set()
        requirements = [
            component.requirements
            for component in self.components.values()
            if component.TYPE not in exclude_components
        ]
        return set.union(*requirements) if requirements else set()

    @property
    def required_secrets(self) -> Set["secret_utils.SecretReference"]:
        """All required secrets for this stack.

        Returns:
            The required secrets of this stack.
        """
        secrets = [
            component.required_secrets for component in self.components.values()
        ]
        return set.union(*secrets) if secrets else set()

    def _validate_secrets(self, raise_exception: bool) -> None:
        """Validates that all secrets of the stack exists.

        Args:
            raise_exception: If `True`, raises an exception if the stack has
                no secrets manager or a secret is missing. Otherwise a
                warning is logged.

        # noqa: DAR402
        Raises:
            StackValidationError: If the stack has no secrets manager or a
                secret is missing.
        """
        env_value = os.getenv(
            ENV_ZENML_SECRET_VALIDATION_LEVEL,
            default=SecretValidationLevel.SECRET_AND_KEY_EXISTS.value,
        )
        secret_validation_level = SecretValidationLevel(env_value)

        required_secrets = self.required_secrets
        if (
            secret_validation_level != SecretValidationLevel.NONE
            and required_secrets
        ):

            def _handle_error(message: str) -> None:
                """Handles the error by raising an exception or logging.

                Args:
                    message: The error message.

                Raises:
                    StackValidationError: If called and `raise_exception` of
                        the outer method is `True`.
                """
                if raise_exception:
                    raise StackValidationError(message)
                else:
                    message += (
                        "\nYou need to solve this issue before running "
                        "a pipeline on this stack."
                    )
                    logger.warning(message)

            if not self.secrets_manager:
                _handle_error(
                    f"Some component in stack `{self.name}` reference secret "
                    "values, but there is no secrets manager in this stack."
                )
                return

            missing = []
            existing_secrets = set(self.secrets_manager.get_all_secret_keys())
            for secret_ref in required_secrets:
                if (
                    secret_validation_level
                    == SecretValidationLevel.SECRET_AND_KEY_EXISTS
                ):
                    try:
                        _ = self.secrets_manager.get_secret(
                            secret_ref.name
                        ).content[secret_ref.key]
                    except KeyError:
                        missing.append(secret_ref)
                elif (
                    secret_validation_level
                    == SecretValidationLevel.SECRET_EXISTS
                ):
                    if secret_ref.name not in existing_secrets:
                        missing.append(secret_ref)

            if missing:
                _handle_error(
                    f"Missing secrets for stack: {missing}.\nTo register the "
                    "missing secrets for this stack, run `zenml stack "
                    f"register-secrets {self.name}`\nIf you want to "
                    "adjust the degree to which ZenML validates the existence "
                    "of secrets in your stack, you can do so by setting the "
                    f"environment variable {ENV_ZENML_SECRET_VALIDATION_LEVEL} "
                    "to one of the following values: "
                    f"{SecretValidationLevel.values()}."
                )

    def validate(
        self,
        decouple_stores: bool = False,
        fail_if_secrets_missing: bool = False,
    ) -> None:
        """Checks whether the stack configuration is valid.

        To check if a stack configuration is valid, the following criteria must
        be met:
        - the `StackValidator` of each stack component has to validate the
            stack to make sure all the components are compatible with each other
        - the stack must either have a properly associated artifact/metadata
            store pair or reset the association.
        - the required secrets of all components need to exist

        Args:
            decouple_stores: Flag to reset the previous associations between
                an artifact store and a metadata store
            fail_if_secrets_missing: If this is `True`, an error will be raised
                if a secret for a component is missing. Otherwise, only a
                warning will be logged.

        Raises:
            StackValidationError: If the artifact store and the metadata store
                are not properly associated.
        """
        for component in self.components.values():
            if component.validator:
                component.validator.validate(stack=self)

        self._validate_secrets(raise_exception=fail_if_secrets_missing)

        if not ZENML_IGNORE_STORE_COUPLINGS:
            from zenml.cli.utils import warning
            from zenml.repository import Repository

            repo = Repository()
            artifact_store_associations = (
                repo.zen_store.get_store_associations_for_artifact_store(
                    self.artifact_store.uuid
                )
            )
            if artifact_store_associations:
                for association in artifact_store_associations:
                    if (
                        association.metadata_store_uuid
                        != self.metadata_store.uuid
                    ):
                        if decouple_stores:
                            warning(
                                f"Removing the association between given "
                                f"artifact store {self.artifact_store.name} "
                                f"(uuid: {self.artifact_store.uuid}) and the "
                                f"metadata store (uuid: "
                                f"{association.metadata_store_uuid})."
                            )
                            repo.zen_store.delete_store_association_for_artifact_and_metadata_store(
                                artifact_store_uuid=self.artifact_store.uuid,
                                metadata_store_uuid=association.metadata_store_uuid,
                            )
                        else:
                            raise StackValidationError(
                                f"The artifact store instance in your stack "
                                f"'{self.artifact_store.name}' (uuid: "
                                f"{self.artifact_store.uuid}) has been "
                                f"previously associated with a different "
                                f"metadata store (uuid: "
                                f"{association.metadata_store_uuid} in a "
                                f"different stack. If either one of these "
                                f"stores are previously populated, this might "
                                f"lead to various problems. In order to solve "
                                f"this issue, you can either create and use "
                                f"another artifact store instance or use the "
                                f"'--decouple_stores' flag when you register/update a stack "
                                f"to reset the associations of these "
                                f"components."
                            )

            m_associations = (
                repo.zen_store.get_store_associations_for_metadata_store(
                    self.metadata_store.uuid
                )
            )
            if m_associations:
                for association in m_associations:
                    if (
                        association.artifact_store_uuid
                        != self.artifact_store.uuid
                    ):
                        if decouple_stores:
                            warning(
                                f"Removing the association between given "
                                f"metadata store {self.metadata_store.name} "
                                f"(uuid: {self.metadata_store.uuid}) and the "
                                f"artifact store (uuid: "
                                f"{association.artifact_store_uuid})."
                            )
                            repo.zen_store.delete_store_association_for_artifact_and_metadata_store(
                                artifact_store_uuid=association.artifact_store_uuid,
                                metadata_store_uuid=self.metadata_store.uuid,
                            )
                        else:
                            raise StackValidationError(
                                f"The metadata store instance in your stack "
                                f"'{self.metadata_store.name}' (uuid: "
                                f"{self.metadata_store.uuid}) has been "
                                f"previously associated with a different "
                                f"artifact store (uuid: "
                                f"{association.artifact_store_uuid} in a "
                                f"different stack. If either one of these "
                                f"stores are previously populated, this might "
                                f"lead to various problems. In order to solve "
                                f"this issue, you can either create and use "
                                f"another artifact store instance or use the "
                                f"'--decouple_stores' flag when you register/update a stack "
                                f"to reset the associations of these "
                                f"components."
                            )

            # Check if the associations already exists, if not create it
            existing_associations = repo.zen_store.get_store_associations_for_artifact_and_metadata_store(
                artifact_store_uuid=self.artifact_store.uuid,
                metadata_store_uuid=self.metadata_store.uuid,
            )
            if len(existing_associations) == 0:
                repo.zen_store.create_store_association(
                    artifact_store_uuid=self.artifact_store.uuid,
                    metadata_store_uuid=self.metadata_store.uuid,
                )

    def _register_pipeline_run(
        self,
        pipeline: "BasePipeline",
        runtime_configuration: "RuntimeConfiguration",
    ) -> None:
        """Registers a pipeline run in the ZenStore.

        Args:
            pipeline: The pipeline that is being run.
            runtime_configuration: The runtime configuration of the pipeline.
        """
        from zenml.repository import Repository
        from zenml.zen_stores.models import StackWrapper
        from zenml.zen_stores.models.pipeline_models import (
            PipelineRunWrapper,
            PipelineWrapper,
        )

        repo = Repository()
        active_project = repo.active_project
        pipeline_run_wrapper = PipelineRunWrapper(
            name=runtime_configuration.run_name,
            pipeline=PipelineWrapper.from_pipeline(pipeline),
            stack=StackWrapper.from_stack(self),
            runtime_configuration=runtime_configuration,
            user_id=repo.active_user.id,
            project_name=active_project.name if active_project else None,
        )

        Repository().zen_store.register_pipeline_run(pipeline_run_wrapper)

    def deploy_pipeline(
        self,
        pipeline: "BasePipeline",
        runtime_configuration: RuntimeConfiguration,
    ) -> Any:
        """Deploys a pipeline on this stack.

        Args:
            pipeline: The pipeline to deploy.
            runtime_configuration: Contains all the runtime configuration
                options specified for the pipeline run.

        Returns:
            The return value of the call to `orchestrator.run_pipeline(...)`.

        Raises:
            StackValidationError: If the stack configuration is not valid.
        """
        self.validate(fail_if_secrets_missing=True)

        for component in self.components.values():
            if not component.is_running:
                raise StackValidationError(
                    f"The '{component.name}' {component.TYPE} stack component "
                    f"is not currently running. Please run the following "
                    f"command to provision and start the component:\n\n"
                    f"    `zenml stack up`\n"
                )

        for component in self.components.values():
            component.prepare_pipeline_deployment(
                pipeline=pipeline,
                stack=self,
                runtime_configuration=runtime_configuration,
            )

        for component in self.components.values():
            component.prepare_pipeline_run()

        runtime_configuration[
            RUN_NAME_OPTION_KEY
        ] = runtime_configuration.run_name or (
            f"{pipeline.name}-"
            f'{datetime.now().strftime("%d_%h_%y-%H_%M_%S_%f")}'
        )

        logger.info(
            "Using stack `%s` to run pipeline `%s`...",
            self.name,
            pipeline.name,
        )
        start_time = time.time()

        original_cache_boolean = pipeline.enable_cache
        if "enable_cache" in runtime_configuration:
            logger.info(
                "Runtime configuration overwriting the pipeline cache settings"
                " to enable_cache=`%s` for this pipeline run. The default "
                "caching strategy is retained for future pipeline runs.",
                runtime_configuration["enable_cache"],
            )
            pipeline.enable_cache = runtime_configuration.get("enable_cache")

        self._register_pipeline_run(
            pipeline=pipeline, runtime_configuration=runtime_configuration
        )

        return_value = self.orchestrator.run(
            pipeline, stack=self, runtime_configuration=runtime_configuration
        )

        # Put pipeline level cache policy back to make sure the next runs
        #  default to that policy again in case the runtime configuration
        #  is not set explicitly
        pipeline.enable_cache = original_cache_boolean

        run_duration = time.time() - start_time
        logger.info(
            "Pipeline run `%s` has finished in %s.",
            runtime_configuration.run_name,
            string_utils.get_human_readable_time(run_duration),
        )

        for component in self.components.values():
            component.cleanup_pipeline_run()

        return return_value

    def prepare_step_run(self) -> None:
        """Prepares running a step."""
        for component in self.components.values():
            component.prepare_step_run()

    def cleanup_step_run(self) -> None:
        """Cleans up resources after the step run is finished."""
        for component in self.components.values():
            component.cleanup_step_run()

    @property
    def is_provisioned(self) -> bool:
        """If the stack provisioned resources to run locally.

        Returns:
            True if the stack provisioned resources to run locally.
        """
        return all(
            component.is_provisioned for component in self.components.values()
        )

    @property
    def is_running(self) -> bool:
        """If the stack is running locally.

        Returns:
            True if the stack is running locally, False otherwise.
        """
        return all(
            component.is_running for component in self.components.values()
        )

    def provision(self) -> None:
        """Provisions resources to run the stack locally."""
        logger.info("Provisioning resources for stack '%s'.", self.name)
        for component in self.components.values():
            if not component.is_provisioned:
                component.provision()
                logger.info("Provisioned resources for %s.", component)

    def deprovision(self) -> None:
        """Deprovisions all local resources of the stack."""
        logger.info("Deprovisioning resources for stack '%s'.", self.name)
        for component in self.components.values():
            if component.is_provisioned:
                try:
                    component.deprovision()
                    logger.info("Deprovisioned resources for %s.", component)
                except NotImplementedError as e:
                    logger.warning(e)

    def resume(self) -> None:
        """Resumes the provisioned local resources of the stack.

        Raises:
            ProvisioningError: If any stack component is missing provisioned
                resources.
        """
        logger.info("Resuming provisioned resources for stack %s.", self.name)
        for component in self.components.values():
            if component.is_running:
                # the component is already running, no need to resume anything
                pass
            elif component.is_provisioned:
                component.resume()
                logger.info("Resumed resources for %s.", component)
            else:
                raise ProvisioningError(
                    f"Unable to resume resources for {component}: No "
                    f"resources have been provisioned for this component."
                )

    def suspend(self) -> None:
        """Suspends the provisioned local resources of the stack."""
        logger.info(
            "Suspending provisioned resources for stack '%s'.", self.name
        )
        for component in self.components.values():
            if not component.is_suspended:
                try:
                    component.suspend()
                    logger.info("Suspended resources for %s.", component)
                except NotImplementedError:
                    logger.warning(
                        "Suspending provisioned resources not implemented "
                        "for %s. Continuing without suspending resources...",
                        component,
                    )
alerter: Optional[BaseAlerter] property readonly

The alerter of the stack.

Returns:

Type Description
Optional[BaseAlerter]

The alerter of the stack.

annotator: Optional[BaseAnnotator] property readonly

The annotator of the stack.

Returns:

Type Description
Optional[BaseAnnotator]

The annotator of the stack.

artifact_store: BaseArtifactStore property readonly

The artifact store of the stack.

Returns:

Type Description
BaseArtifactStore

The artifact store of the stack.

components: Dict[zenml.enums.StackComponentType, StackComponent] property readonly

All components of the stack.

Returns:

Type Description
Dict[zenml.enums.StackComponentType, StackComponent]

A dictionary of all components of the stack.

container_registry: Optional[BaseContainerRegistry] property readonly

The container registry of the stack.

Returns:

Type Description
Optional[BaseContainerRegistry]

The container registry of the stack or None if the stack does not have a container registry.

data_validator: Optional[BaseDataValidator] property readonly

The data validator of the stack.

Returns:

Type Description
Optional[BaseDataValidator]

The data validator of the stack.

experiment_tracker: Optional[BaseExperimentTracker] property readonly

The experiment tracker of the stack.

Returns:

Type Description
Optional[BaseExperimentTracker]

The experiment tracker of the stack.

feature_store: Optional[BaseFeatureStore] property readonly

The feature store of the stack.

Returns:

Type Description
Optional[BaseFeatureStore]

The feature store of the stack.

is_provisioned: bool property readonly

If the stack provisioned resources to run locally.

Returns:

Type Description
bool

True if the stack provisioned resources to run locally.

is_running: bool property readonly

If the stack is running locally.

Returns:

Type Description
bool

True if the stack is running locally, False otherwise.

metadata_store: BaseMetadataStore property readonly

The metadata store of the stack.

Returns:

Type Description
BaseMetadataStore

The metadata store of the stack.

model_deployer: Optional[BaseModelDeployer] property readonly

The model deployer of the stack.

Returns:

Type Description
Optional[BaseModelDeployer]

The model deployer of the stack.

name: str property readonly

The name of the stack.

Returns:

Type Description
str

The name of the stack.

orchestrator: BaseOrchestrator property readonly

The orchestrator of the stack.

Returns:

Type Description
BaseOrchestrator

The orchestrator of the stack.

required_secrets: Set[secret_utils.SecretReference] property readonly

All required secrets for this stack.

Returns:

Type Description
Set[secret_utils.SecretReference]

The required secrets of this stack.

runtime_options: Dict[str, Any] property readonly

Runtime options that are available to configure this stack.

This method combines the available runtime options for all components of this stack. See StackComponent.runtime_options() for more information.

Returns:

Type Description
Dict[str, Any]

A dictionary of runtime options.

secrets_manager: Optional[BaseSecretsManager] property readonly

The secrets manager of the stack.

Returns:

Type Description
Optional[BaseSecretsManager]

The secrets manager of the stack.

step_operator: Optional[BaseStepOperator] property readonly

The step operator of the stack.

Returns:

Type Description
Optional[BaseStepOperator]

The step operator of the stack.

__init__(self, name, *, orchestrator, metadata_store, artifact_store, container_registry=None, secrets_manager=None, step_operator=None, feature_store=None, model_deployer=None, experiment_tracker=None, alerter=None, annotator=None, data_validator=None) special

Initializes and validates a stack instance.

noqa: DAR402

Parameters:

Name Type Description Default
name str

Name of the stack.

required
orchestrator BaseOrchestrator

Orchestrator component of the stack.

required
metadata_store BaseMetadataStore

Metadata store component of the stack.

required
artifact_store BaseArtifactStore

Artifact store component of the stack.

required
container_registry Optional[BaseContainerRegistry]

Container registry component of the stack.

None
secrets_manager Optional[BaseSecretsManager]

Secrets manager component of the stack.

None
step_operator Optional[BaseStepOperator]

Step operator component of the stack.

None
feature_store Optional[BaseFeatureStore]

Feature store component of the stack.

None
model_deployer Optional[BaseModelDeployer]

Model deployer component of the stack.

None
experiment_tracker Optional[BaseExperimentTracker]

Experiment tracker component of the stack.

None
alerter Optional[BaseAlerter]

Alerter component of the stack.

None
annotator Optional[BaseAnnotator]

Annotator component of the stack.

None
data_validator Optional[BaseDataValidator]

Data validator component of the stack.

None

Exceptions:

Type Description
StackValidationError

If the stack configuration is not valid.

Source code in zenml/stack/stack.py
def __init__(
    self,
    name: str,
    *,
    orchestrator: "BaseOrchestrator",
    metadata_store: "BaseMetadataStore",
    artifact_store: "BaseArtifactStore",
    container_registry: Optional["BaseContainerRegistry"] = None,
    secrets_manager: Optional["BaseSecretsManager"] = None,
    step_operator: Optional["BaseStepOperator"] = None,
    feature_store: Optional["BaseFeatureStore"] = None,
    model_deployer: Optional["BaseModelDeployer"] = None,
    experiment_tracker: Optional["BaseExperimentTracker"] = None,
    alerter: Optional["BaseAlerter"] = None,
    annotator: Optional["BaseAnnotator"] = None,
    data_validator: Optional["BaseDataValidator"] = None,
):
    """Initializes and validates a stack instance.

    # noqa: DAR402

    Args:
        name: Name of the stack.
        orchestrator: Orchestrator component of the stack.
        metadata_store: Metadata store component of the stack.
        artifact_store: Artifact store component of the stack.
        container_registry: Container registry component of the stack.
        secrets_manager: Secrets manager component of the stack.
        step_operator: Step operator component of the stack.
        feature_store: Feature store component of the stack.
        model_deployer: Model deployer component of the stack.
        experiment_tracker: Experiment tracker component of the stack.
        alerter: Alerter component of the stack.
        annotator: Annotator component of the stack.
        data_validator: Data validator component of the stack.

    Raises:
        StackValidationError: If the stack configuration is not valid.
    """
    self._name = name
    self._orchestrator = orchestrator
    self._metadata_store = metadata_store
    self._artifact_store = artifact_store
    self._container_registry = container_registry
    self._step_operator = step_operator
    self._secrets_manager = secrets_manager
    self._feature_store = feature_store
    self._model_deployer = model_deployer
    self._experiment_tracker = experiment_tracker
    self._alerter = alerter
    self._annotator = annotator
    self._data_validator = data_validator
cleanup_step_run(self)

Cleans up resources after the step run is finished.

Source code in zenml/stack/stack.py
def cleanup_step_run(self) -> None:
    """Cleans up resources after the step run is finished."""
    for component in self.components.values():
        component.cleanup_step_run()
default_local_stack() classmethod

Creates a stack instance which is configured to run locally.

Returns:

Type Description
Stack

A stack instance configured to run locally.

Source code in zenml/stack/stack.py
@classmethod
def default_local_stack(cls) -> "Stack":
    """Creates a stack instance which is configured to run locally.

    Returns:
        A stack instance configured to run locally.
    """
    from zenml.artifact_stores import LocalArtifactStore
    from zenml.metadata_stores import SQLiteMetadataStore
    from zenml.orchestrators import LocalOrchestrator
    from zenml.stack.stack_component import uuid_factory

    orchestrator = LocalOrchestrator(name="default")

    artifact_store_uuid = uuid_factory()
    artifact_store_path = os.path.join(
        GlobalConfiguration().config_directory,
        "local_stores",
        str(artifact_store_uuid),
    )
    io_utils.create_dir_recursive_if_not_exists(artifact_store_path)
    artifact_store = LocalArtifactStore(
        name="default",
        uuid=artifact_store_uuid,
        path=artifact_store_path,
    )

    metadata_store_path = os.path.join(artifact_store_path, "metadata.db")
    metadata_store = SQLiteMetadataStore(
        name="default", uri=metadata_store_path
    )

    return cls(
        name="default",
        orchestrator=orchestrator,
        metadata_store=metadata_store,
        artifact_store=artifact_store,
    )
deploy_pipeline(self, pipeline, runtime_configuration)

Deploys a pipeline on this stack.

Parameters:

Name Type Description Default
pipeline BasePipeline

The pipeline to deploy.

required
runtime_configuration RuntimeConfiguration

Contains all the runtime configuration options specified for the pipeline run.

required

Returns:

Type Description
Any

The return value of the call to orchestrator.run_pipeline(...).

Exceptions:

Type Description
StackValidationError

If the stack configuration is not valid.

Source code in zenml/stack/stack.py
def deploy_pipeline(
    self,
    pipeline: "BasePipeline",
    runtime_configuration: RuntimeConfiguration,
) -> Any:
    """Deploys a pipeline on this stack.

    Args:
        pipeline: The pipeline to deploy.
        runtime_configuration: Contains all the runtime configuration
            options specified for the pipeline run.

    Returns:
        The return value of the call to `orchestrator.run_pipeline(...)`.

    Raises:
        StackValidationError: If the stack configuration is not valid.
    """
    self.validate(fail_if_secrets_missing=True)

    for component in self.components.values():
        if not component.is_running:
            raise StackValidationError(
                f"The '{component.name}' {component.TYPE} stack component "
                f"is not currently running. Please run the following "
                f"command to provision and start the component:\n\n"
                f"    `zenml stack up`\n"
            )

    for component in self.components.values():
        component.prepare_pipeline_deployment(
            pipeline=pipeline,
            stack=self,
            runtime_configuration=runtime_configuration,
        )

    for component in self.components.values():
        component.prepare_pipeline_run()

    runtime_configuration[
        RUN_NAME_OPTION_KEY
    ] = runtime_configuration.run_name or (
        f"{pipeline.name}-"
        f'{datetime.now().strftime("%d_%h_%y-%H_%M_%S_%f")}'
    )

    logger.info(
        "Using stack `%s` to run pipeline `%s`...",
        self.name,
        pipeline.name,
    )
    start_time = time.time()

    original_cache_boolean = pipeline.enable_cache
    if "enable_cache" in runtime_configuration:
        logger.info(
            "Runtime configuration overwriting the pipeline cache settings"
            " to enable_cache=`%s` for this pipeline run. The default "
            "caching strategy is retained for future pipeline runs.",
            runtime_configuration["enable_cache"],
        )
        pipeline.enable_cache = runtime_configuration.get("enable_cache")

    self._register_pipeline_run(
        pipeline=pipeline, runtime_configuration=runtime_configuration
    )

    return_value = self.orchestrator.run(
        pipeline, stack=self, runtime_configuration=runtime_configuration
    )

    # Put pipeline level cache policy back to make sure the next runs
    #  default to that policy again in case the runtime configuration
    #  is not set explicitly
    pipeline.enable_cache = original_cache_boolean

    run_duration = time.time() - start_time
    logger.info(
        "Pipeline run `%s` has finished in %s.",
        runtime_configuration.run_name,
        string_utils.get_human_readable_time(run_duration),
    )

    for component in self.components.values():
        component.cleanup_pipeline_run()

    return return_value
deprovision(self)

Deprovisions all local resources of the stack.

Source code in zenml/stack/stack.py
def deprovision(self) -> None:
    """Deprovisions all local resources of the stack."""
    logger.info("Deprovisioning resources for stack '%s'.", self.name)
    for component in self.components.values():
        if component.is_provisioned:
            try:
                component.deprovision()
                logger.info("Deprovisioned resources for %s.", component)
            except NotImplementedError as e:
                logger.warning(e)
dict(self)

Converts the stack into a dictionary.

Returns:

Type Description
Dict[str, str]

A dictionary containing the stack components.

Source code in zenml/stack/stack.py
def dict(self) -> Dict[str, str]:
    """Converts the stack into a dictionary.

    Returns:
        A dictionary containing the stack components.
    """
    component_dict = {
        component_type.value: component.json(sort_keys=True)
        for component_type, component in self.components.items()
    }
    component_dict.update({"name": self.name})
    return component_dict
from_components(name, components) classmethod

Creates a stack instance from a dict of stack components.

noqa: DAR402

Parameters:

Name Type Description Default
name str

The name of the stack.

required
components Dict[zenml.enums.StackComponentType, StackComponent]

The components of the stack.

required

Returns:

Type Description
Stack

A stack instance consisting of the given components.

Exceptions:

Type Description
TypeError

If a required component is missing or a component doesn't inherit from the expected base class.

Source code in zenml/stack/stack.py
@classmethod
def from_components(
    cls, name: str, components: Dict[StackComponentType, "StackComponent"]
) -> "Stack":
    """Creates a stack instance from a dict of stack components.

    # noqa: DAR402

    Args:
        name: The name of the stack.
        components: The components of the stack.

    Returns:
        A stack instance consisting of the given components.

    Raises:
        TypeError: If a required component is missing or a component
            doesn't inherit from the expected base class.
    """
    from zenml.alerter import BaseAlerter
    from zenml.annotators import BaseAnnotator
    from zenml.artifact_stores import BaseArtifactStore
    from zenml.container_registries import BaseContainerRegistry
    from zenml.data_validators import BaseDataValidator
    from zenml.experiment_trackers import BaseExperimentTracker
    from zenml.feature_stores import BaseFeatureStore
    from zenml.metadata_stores import BaseMetadataStore
    from zenml.model_deployers import BaseModelDeployer
    from zenml.orchestrators import BaseOrchestrator
    from zenml.secrets_managers import BaseSecretsManager
    from zenml.step_operators import BaseStepOperator

    def _raise_type_error(
        component: Optional["StackComponent"], expected_class: Type[Any]
    ) -> NoReturn:
        """Raises a TypeError that the component has an unexpected type.

        Args:
            component: The component that has an unexpected type.
            expected_class: The expected type of the component.

        Raises:
            TypeError: If the component has an unexpected type.
        """
        raise TypeError(
            f"Unable to create stack: Wrong stack component type "
            f"`{component.__class__.__name__}` (expected: subclass "
            f"of `{expected_class.__name__}`)"
        )

    orchestrator = components.get(StackComponentType.ORCHESTRATOR)
    if not isinstance(orchestrator, BaseOrchestrator):
        _raise_type_error(orchestrator, BaseOrchestrator)

    metadata_store = components.get(StackComponentType.METADATA_STORE)
    if not isinstance(metadata_store, BaseMetadataStore):
        _raise_type_error(metadata_store, BaseMetadataStore)

    artifact_store = components.get(StackComponentType.ARTIFACT_STORE)
    if not isinstance(artifact_store, BaseArtifactStore):
        _raise_type_error(artifact_store, BaseArtifactStore)

    container_registry = components.get(
        StackComponentType.CONTAINER_REGISTRY
    )
    if container_registry is not None and not isinstance(
        container_registry, BaseContainerRegistry
    ):
        _raise_type_error(container_registry, BaseContainerRegistry)

    secrets_manager = components.get(StackComponentType.SECRETS_MANAGER)
    if secrets_manager is not None and not isinstance(
        secrets_manager, BaseSecretsManager
    ):
        _raise_type_error(secrets_manager, BaseSecretsManager)

    step_operator = components.get(StackComponentType.STEP_OPERATOR)
    if step_operator is not None and not isinstance(
        step_operator, BaseStepOperator
    ):
        _raise_type_error(step_operator, BaseStepOperator)

    feature_store = components.get(StackComponentType.FEATURE_STORE)
    if feature_store is not None and not isinstance(
        feature_store, BaseFeatureStore
    ):
        _raise_type_error(feature_store, BaseFeatureStore)

    model_deployer = components.get(StackComponentType.MODEL_DEPLOYER)
    if model_deployer is not None and not isinstance(
        model_deployer, BaseModelDeployer
    ):
        _raise_type_error(model_deployer, BaseModelDeployer)

    experiment_tracker = components.get(
        StackComponentType.EXPERIMENT_TRACKER
    )
    if experiment_tracker is not None and not isinstance(
        experiment_tracker, BaseExperimentTracker
    ):
        _raise_type_error(experiment_tracker, BaseExperimentTracker)

    alerter = components.get(StackComponentType.ALERTER)
    if alerter is not None and not isinstance(alerter, BaseAlerter):
        _raise_type_error(alerter, BaseAlerter)

    annotator = components.get(StackComponentType.ANNOTATOR)
    if annotator is not None and not isinstance(annotator, BaseAnnotator):
        _raise_type_error(annotator, BaseAnnotator)

    data_validator = components.get(StackComponentType.DATA_VALIDATOR)
    if data_validator is not None and not isinstance(
        data_validator, BaseDataValidator
    ):
        _raise_type_error(data_validator, BaseDataValidator)

    return Stack(
        name=name,
        orchestrator=orchestrator,
        metadata_store=metadata_store,
        artifact_store=artifact_store,
        container_registry=container_registry,
        secrets_manager=secrets_manager,
        step_operator=step_operator,
        feature_store=feature_store,
        model_deployer=model_deployer,
        experiment_tracker=experiment_tracker,
        alerter=alerter,
        annotator=annotator,
        data_validator=data_validator,
    )
prepare_step_run(self)

Prepares running a step.

Source code in zenml/stack/stack.py
def prepare_step_run(self) -> None:
    """Prepares running a step."""
    for component in self.components.values():
        component.prepare_step_run()
provision(self)

Provisions resources to run the stack locally.

Source code in zenml/stack/stack.py
def provision(self) -> None:
    """Provisions resources to run the stack locally."""
    logger.info("Provisioning resources for stack '%s'.", self.name)
    for component in self.components.values():
        if not component.is_provisioned:
            component.provision()
            logger.info("Provisioned resources for %s.", component)
requirements(self, exclude_components=None)

Set of PyPI requirements for the stack.

This method combines the requirements of all stack components (except the ones specified in exclude_components).

Parameters:

Name Type Description Default
exclude_components Optional[AbstractSet[zenml.enums.StackComponentType]]

Set of component types for which the requirements should not be included in the output.

None

Returns:

Type Description
Set[str]

Set of PyPI requirements.

Source code in zenml/stack/stack.py
def requirements(
    self,
    exclude_components: Optional[AbstractSet[StackComponentType]] = None,
) -> Set[str]:
    """Set of PyPI requirements for the stack.

    This method combines the requirements of all stack components (except
    the ones specified in `exclude_components`).

    Args:
        exclude_components: Set of component types for which the
            requirements should not be included in the output.

    Returns:
        Set of PyPI requirements.
    """
    exclude_components = exclude_components or set()
    requirements = [
        component.requirements
        for component in self.components.values()
        if component.TYPE not in exclude_components
    ]
    return set.union(*requirements) if requirements else set()
resume(self)

Resumes the provisioned local resources of the stack.

Exceptions:

Type Description
ProvisioningError

If any stack component is missing provisioned resources.

Source code in zenml/stack/stack.py
def resume(self) -> None:
    """Resumes the provisioned local resources of the stack.

    Raises:
        ProvisioningError: If any stack component is missing provisioned
            resources.
    """
    logger.info("Resuming provisioned resources for stack %s.", self.name)
    for component in self.components.values():
        if component.is_running:
            # the component is already running, no need to resume anything
            pass
        elif component.is_provisioned:
            component.resume()
            logger.info("Resumed resources for %s.", component)
        else:
            raise ProvisioningError(
                f"Unable to resume resources for {component}: No "
                f"resources have been provisioned for this component."
            )
suspend(self)

Suspends the provisioned local resources of the stack.

Source code in zenml/stack/stack.py
def suspend(self) -> None:
    """Suspends the provisioned local resources of the stack."""
    logger.info(
        "Suspending provisioned resources for stack '%s'.", self.name
    )
    for component in self.components.values():
        if not component.is_suspended:
            try:
                component.suspend()
                logger.info("Suspended resources for %s.", component)
            except NotImplementedError:
                logger.warning(
                    "Suspending provisioned resources not implemented "
                    "for %s. Continuing without suspending resources...",
                    component,
                )
validate(self, decouple_stores=False, fail_if_secrets_missing=False)

Checks whether the stack configuration is valid.

To check if a stack configuration is valid, the following criteria must be met: - the StackValidator of each stack component has to validate the stack to make sure all the components are compatible with each other - the stack must either have a properly associated artifact/metadata store pair or reset the association. - the required secrets of all components need to exist

Parameters:

Name Type Description Default
decouple_stores bool

Flag to reset the previous associations between an artifact store and a metadata store

False
fail_if_secrets_missing bool

If this is True, an error will be raised if a secret for a component is missing. Otherwise, only a warning will be logged.

False

Exceptions:

Type Description
StackValidationError

If the artifact store and the metadata store are not properly associated.

Source code in zenml/stack/stack.py
def validate(
    self,
    decouple_stores: bool = False,
    fail_if_secrets_missing: bool = False,
) -> None:
    """Checks whether the stack configuration is valid.

    To check if a stack configuration is valid, the following criteria must
    be met:
    - the `StackValidator` of each stack component has to validate the
        stack to make sure all the components are compatible with each other
    - the stack must either have a properly associated artifact/metadata
        store pair or reset the association.
    - the required secrets of all components need to exist

    Args:
        decouple_stores: Flag to reset the previous associations between
            an artifact store and a metadata store
        fail_if_secrets_missing: If this is `True`, an error will be raised
            if a secret for a component is missing. Otherwise, only a
            warning will be logged.

    Raises:
        StackValidationError: If the artifact store and the metadata store
            are not properly associated.
    """
    for component in self.components.values():
        if component.validator:
            component.validator.validate(stack=self)

    self._validate_secrets(raise_exception=fail_if_secrets_missing)

    if not ZENML_IGNORE_STORE_COUPLINGS:
        from zenml.cli.utils import warning
        from zenml.repository import Repository

        repo = Repository()
        artifact_store_associations = (
            repo.zen_store.get_store_associations_for_artifact_store(
                self.artifact_store.uuid
            )
        )
        if artifact_store_associations:
            for association in artifact_store_associations:
                if (
                    association.metadata_store_uuid
                    != self.metadata_store.uuid
                ):
                    if decouple_stores:
                        warning(
                            f"Removing the association between given "
                            f"artifact store {self.artifact_store.name} "
                            f"(uuid: {self.artifact_store.uuid}) and the "
                            f"metadata store (uuid: "
                            f"{association.metadata_store_uuid})."
                        )
                        repo.zen_store.delete_store_association_for_artifact_and_metadata_store(
                            artifact_store_uuid=self.artifact_store.uuid,
                            metadata_store_uuid=association.metadata_store_uuid,
                        )
                    else:
                        raise StackValidationError(
                            f"The artifact store instance in your stack "
                            f"'{self.artifact_store.name}' (uuid: "
                            f"{self.artifact_store.uuid}) has been "
                            f"previously associated with a different "
                            f"metadata store (uuid: "
                            f"{association.metadata_store_uuid} in a "
                            f"different stack. If either one of these "
                            f"stores are previously populated, this might "
                            f"lead to various problems. In order to solve "
                            f"this issue, you can either create and use "
                            f"another artifact store instance or use the "
                            f"'--decouple_stores' flag when you register/update a stack "
                            f"to reset the associations of these "
                            f"components."
                        )

        m_associations = (
            repo.zen_store.get_store_associations_for_metadata_store(
                self.metadata_store.uuid
            )
        )
        if m_associations:
            for association in m_associations:
                if (
                    association.artifact_store_uuid
                    != self.artifact_store.uuid
                ):
                    if decouple_stores:
                        warning(
                            f"Removing the association between given "
                            f"metadata store {self.metadata_store.name} "
                            f"(uuid: {self.metadata_store.uuid}) and the "
                            f"artifact store (uuid: "
                            f"{association.artifact_store_uuid})."
                        )
                        repo.zen_store.delete_store_association_for_artifact_and_metadata_store(
                            artifact_store_uuid=association.artifact_store_uuid,
                            metadata_store_uuid=self.metadata_store.uuid,
                        )
                    else:
                        raise StackValidationError(
                            f"The metadata store instance in your stack "
                            f"'{self.metadata_store.name}' (uuid: "
                            f"{self.metadata_store.uuid}) has been "
                            f"previously associated with a different "
                            f"artifact store (uuid: "
                            f"{association.artifact_store_uuid} in a "
                            f"different stack. If either one of these "
                            f"stores are previously populated, this might "
                            f"lead to various problems. In order to solve "
                            f"this issue, you can either create and use "
                            f"another artifact store instance or use the "
                            f"'--decouple_stores' flag when you register/update a stack "
                            f"to reset the associations of these "
                            f"components."
                        )

        # Check if the associations already exists, if not create it
        existing_associations = repo.zen_store.get_store_associations_for_artifact_and_metadata_store(
            artifact_store_uuid=self.artifact_store.uuid,
            metadata_store_uuid=self.metadata_store.uuid,
        )
        if len(existing_associations) == 0:
            repo.zen_store.create_store_association(
                artifact_store_uuid=self.artifact_store.uuid,
                metadata_store_uuid=self.metadata_store.uuid,
            )

stack_component

Implementation of the ZenML Stack Component class.

StackComponent (BaseModel, ABC) pydantic-model

Abstract StackComponent class for all components of a ZenML stack.

Attributes:

Name Type Description
name str

The name of the component.

uuid UUID

Unique identifier of the component.

Source code in zenml/stack/stack_component.py
class StackComponent(BaseModel, ABC):
    """Abstract StackComponent class for all components of a ZenML stack.

    Attributes:
        name: The name of the component.
        uuid: Unique identifier of the component.
    """

    name: str
    uuid: UUID = Field(default_factory=uuid_factory)

    # Class Configuration
    TYPE: ClassVar[StackComponentType]
    FLAVOR: ClassVar[str]

    def __init__(self, **kwargs: Any) -> None:
        """Ensures that secret references don't clash with pydantic validation.

        StackComponents allow the specification of all their string attributes
        using secret references of the form `{{secret_name.key}}`. This however
        is only possible when the stack component does not perform any explicit
        validation of this attribute using pydantic validators. If this were
        the case, the validation would run on the secret reference and would
        fail or in the worst case, modify the secret reference and lead to
        unexpected behavior. This method ensures that no attributes that require
        custom pydantic validation are set as secret references.

        Args:
            **kwargs: Arguments to initialize this stack component.

        Raises:
            ValueError: If an attribute that requires custom pydantic validation
                is passed as a secret reference, or if the `name` attribute
                was passed as a secret reference.
        """
        for key, value in kwargs.items():
            try:
                field = self.__class__.__fields__[key]
            except KeyError:
                # Value for a private attribute or non-existing field, this
                # will fail during the upcoming pydantic validation
                continue

            if value is None:
                continue

            if not secret_utils.is_secret_reference(value):
                if secret_utils.is_secret_field(field):
                    logger.warning(
                        "You specified a plain-text value for the sensitive "
                        f"attribute `{key}` for a `{self.__class__.__name__}` "
                        "stack component. This is currently only a warning, "
                        "but future versions of ZenML will require you to pass "
                        "in senstive information as secrets. Check out the "
                        "documentation on how to configure your stack "
                        "components with secrets here: "
                        "https://docs.zenml.io/developer-guide/advanced-usage/secret-references"
                    )
                continue

            if key == "name":
                raise ValueError(
                    "Passing the `name` attribute of a stack component as a "
                    "secret reference is not allowed."
                )

            requires_validation = field.pre_validators or field.post_validators
            if requires_validation:
                raise ValueError(
                    f"Passing the stack component attribute `{key}` as a "
                    "secret reference is not allowed as additional validation "
                    "is required for this attribute."
                )

        super().__init__(**kwargs)

    def __custom_getattribute__(self, key: str) -> Any:
        """Returns the (potentially resolved) attribute value for the given key.

        An attribute value may be either specified directly, or as a secret
        reference. In case of a secret reference, this method resolves the
        reference and returns the secret value instead.

        Args:
            key: The key for which to get the attribute value.

        Raises:
            RuntimeError: If the stack component is not part of the active
                stack, or the active stack is missing a secrets manager.
            KeyError: If the secret or secret key don't exist.

        Returns:
            The (potentially resolved) attribute value.
        """
        value = super().__getattribute__(key)

        if not secret_utils.is_secret_reference(value):
            return value

        from zenml.repository import Repository

        stack = Repository().active_stack

        # A stack component can be part of many stacks, and currently a
        # secrets manager is associated with a stack. This means we're
        # not able to identify the 'correct' secrets manager that the user
        # wanted to resolve the secrets in a general way. We therefore
        # limit secret resolving to components of the active stack.
        component = stack.components.get(self.TYPE, None)
        if not component or component.uuid != self.uuid:
            raise RuntimeError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`: The stack component is not "
                "part of the active stack and therefore can't have it's "
                "secret references resolved. If you want to access attributes "
                "of this stack component which reference secrets, set a stack "
                "which includes both this component and a secrets manager as "
                "your active stack: `zenml stack set <STACK_NAME>`."
            )

        secrets_manager = Repository().active_stack.secrets_manager
        if not secrets_manager:
            raise RuntimeError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`: The active stack does not "
                "have a secrets manager."
            )

        secret_ref = secret_utils.parse_secret_reference(value)
        try:
            secret = secrets_manager.get_secret(secret_ref.name)
        except KeyError:
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`: The secret "
                f"{secret_ref.name} does not exist."
            )

        try:
            secret_value = secret.content[secret_ref.key]
        except KeyError:
            raise KeyError(
                f"Failed to resolve secret reference for attribute {key} "
                f"of stack component `{self}`: The secret "
                f"{secret_ref.name} does not contain a value for key "
                f"{secret_ref.key}. Available keys: {set(secret.content)}."
            )

        return str(secret_value)

    if not TYPE_CHECKING:
        # When defining __getattribute__, mypy allows accessing non-existent
        # attributes without failing
        # (see https://github.com/python/mypy/issues/13319).
        __getattribute__ = __custom_getattribute__

    @property
    def required_secrets(self) -> Set[secret_utils.SecretReference]:
        """All required secrets for this stack component.

        Returns:
            The required secrets of this stack component.
        """
        return {
            secret_utils.parse_secret_reference(v)
            for v in self.dict().values()
            if secret_utils.is_secret_reference(v)
        }

    @property
    def log_file(self) -> Optional[str]:
        """Optional path to a log file for the stack component.

        Returns:
            Optional path to a log file for the stack component.
        """
        # TODO [ENG-136]: Add support for multiple log files for a stack
        #  component. E.g. let each component return a generator that yields
        #  logs instead of specifying a single file path.
        return None

    @property
    def runtime_options(self) -> Dict[str, Any]:
        """Runtime options that are available to configure this component.

        The items of the dictionary should map option names (which can be used
        to configure the option in the `RuntimeConfiguration`) to default
        values for the option (or `None` if there is no default value).

        Returns:
            A dictionary of runtime options.
        """
        return {}

    @property
    def requirements(self) -> Set[str]:
        """Set of PyPI requirements for the component.

        Returns:
            A set of PyPI requirements for the component.
        """
        from zenml.integrations.utils import get_requirements_for_module

        return set(get_requirements_for_module(self.__module__))

    @property
    def local_path(self) -> Optional[str]:
        """Path to a local directory used by the component to store persistent information.

        This property should only be implemented by components that need to
        store persistent information in a directory on the local machine and
        also need that information to be available during pipeline runs.

        IMPORTANT: the path returned by this property must always be a path
        that is relative to the ZenML global config directory. The local
        Kubeflow orchestrator relies on this convention to correctly mount the
        local folders in the Kubeflow containers. This is an example of a valid
        path:

        ```python
        from zenml.utils.io_utils import get_global_config_directory
        from zenml.constants import LOCAL_STORES_DIRECTORY_NAME

        ...

        @property
        def local_path(self) -> Optional[str]:

            return os.path.join(
                get_global_config_directory(),
                LOCAL_STORES_DIRECTORY_NAME,
                str(uuid),
            )
        ```

        Returns:
            A path to a local directory used by the component to store
            persistent information.
        """
        return None

    def prepare_pipeline_deployment(
        self,
        pipeline: "BasePipeline",
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> None:
        """Prepares deploying the pipeline.

        This method gets called immediately before a pipeline is deployed.
        Subclasses should override it if they require runtime configuration
        options or if they need to run code before the pipeline deployment.

        Args:
            pipeline: The pipeline that will be deployed.
            stack: The stack on which the pipeline will be deployed.
            runtime_configuration: Contains all the runtime configuration
                options specified for the pipeline run.
        """

    def prepare_pipeline_run(self) -> None:
        """Prepares running the pipeline."""

    def cleanup_pipeline_run(self) -> None:
        """Cleans up resources after the pipeline run is finished."""

    def prepare_step_run(self) -> None:
        """Prepares running a step."""

    def cleanup_step_run(self) -> None:
        """Cleans up resources after the step run is finished."""

    @property
    def post_registration_message(self) -> Optional[str]:
        """Optional message that will be printed after the stack component is registered.

        Returns:
            An optional message.
        """
        return None

    @property
    def validator(self) -> Optional["StackValidator"]:
        """The optional validator of the stack component.

        This validator will be called each time a stack with the stack
        component is initialized. Subclasses should override this property
        and return a `StackValidator` that makes sure they're not included in
        any stack that they're not compatible with.

        Returns:
            An optional `StackValidator` instance.
        """
        return None

    @property
    def is_provisioned(self) -> bool:
        """If the component provisioned resources to run.

        Returns:
            True if the component provisioned resources to run.
        """
        return True

    @property
    def is_running(self) -> bool:
        """If the component is running.

        Returns:
            True if the component is running.
        """
        return True

    @property
    def is_suspended(self) -> bool:
        """If the component is suspended.

        Returns:
            True if the component is suspended.
        """
        return not self.is_running

    def provision(self) -> None:
        """Provisions resources to run the component.

        Raises:
            NotImplementedError: If the component does not implement this
                method.
        """
        raise NotImplementedError(
            f"Provisioning resources not implemented for {self}."
        )

    def deprovision(self) -> None:
        """Deprovisions all resources of the component.

        Raises:
            NotImplementedError: If the component does not implement this
                method.
        """
        raise NotImplementedError(
            f"Deprovisioning resource not implemented for {self}."
        )

    def resume(self) -> None:
        """Resumes the provisioned resources of the component.

        Raises:
            NotImplementedError: If the component does not implement this
                method.
        """
        raise NotImplementedError(
            f"Resuming provisioned resources not implemented for {self}."
        )

    def suspend(self) -> None:
        """Suspends the provisioned resources of the component.

        Raises:
            NotImplementedError: If the component does not implement this
                method.
        """
        raise NotImplementedError(
            f"Suspending provisioned resources not implemented for {self}."
        )

    def __repr__(self) -> str:
        """String representation of the stack component.

        Returns:
            A string representation of the stack component.
        """
        attribute_representation = ", ".join(
            f"{key}={value}" for key, value in self.dict().items()
        )
        return (
            f"{self.__class__.__qualname__}(type={self.TYPE}, "
            f"flavor={self.FLAVOR}, {attribute_representation})"
        )

    def __str__(self) -> str:
        """String representation of the stack component.

        Returns:
            A string representation of the stack component.
        """
        return self.__repr__()

    @root_validator(skip_on_failure=True)
    def _ensure_stack_component_complete(cls, values: Dict[str, Any]) -> Any:
        """Ensures that the stack component is complete.

        Args:
            values: The values of the stack component.

        Returns:
            The values of the stack component.

        Raises:
            StackComponentInterfaceError: If the stack component is not
                implemented correctly.
        """
        try:
            stack_component_type = getattr(cls, "TYPE")
            assert stack_component_type in StackComponentType
        except (AttributeError, AssertionError):
            raise StackComponentInterfaceError(
                textwrap.dedent(
                    """
                    When you are working with any classes which subclass from
                    `zenml.stack.StackComponent` please make sure that your
                    class has a ClassVar named `TYPE` and its value is set to a
                    `StackComponentType` from `from zenml.enums import
                    StackComponentType`.

                    In most of the cases, this is already done for you within
                    the implementation of the base concept.

                    Example:

                    class BaseArtifactStore(StackComponent):
                        # Instance Variables
                        path: str

                        # Class Variables
                        TYPE: ClassVar[StackComponentType] = StackComponentType.ARTIFACT_STORE
                    """
                )
            )

        try:
            getattr(cls, "FLAVOR")
        except AttributeError:
            raise StackComponentInterfaceError(
                textwrap.dedent(
                    """
                    When you are working with any classes which subclass from
                    `zenml.stack.StackComponent` please make sure that your
                    class has a defined ClassVar `FLAVOR`.

                    Example:

                    class LocalArtifactStore(BaseArtifactStore):

                        ...

                        # Define flavor as a ClassVar
                        FLAVOR: ClassVar[str] = "local"

                        ...
                    """
                )
            )

        return values

    class Config:
        """Pydantic configuration class."""

        # public attributes are immutable
        allow_mutation = False
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
        # prevent extra attributes during model initialization
        extra = Extra.forbid
is_provisioned: bool property readonly

If the component provisioned resources to run.

Returns:

Type Description
bool

True if the component provisioned resources to run.

is_running: bool property readonly

If the component is running.

Returns:

Type Description
bool

True if the component is running.

is_suspended: bool property readonly

If the component is suspended.

Returns:

Type Description
bool

True if the component is suspended.

local_path: Optional[str] property readonly

Path to a local directory used by the component to store persistent information.

This property should only be implemented by components that need to store persistent information in a directory on the local machine and also need that information to be available during pipeline runs.

IMPORTANT: the path returned by this property must always be a path that is relative to the ZenML global config directory. The local Kubeflow orchestrator relies on this convention to correctly mount the local folders in the Kubeflow containers. This is an example of a valid path:

from zenml.utils.io_utils import get_global_config_directory
from zenml.constants import LOCAL_STORES_DIRECTORY_NAME

...

@property
def local_path(self) -> Optional[str]:

    return os.path.join(
        get_global_config_directory(),
        LOCAL_STORES_DIRECTORY_NAME,
        str(uuid),
    )

Returns:

Type Description
Optional[str]

A path to a local directory used by the component to store persistent information.

log_file: Optional[str] property readonly

Optional path to a log file for the stack component.

Returns:

Type Description
Optional[str]

Optional path to a log file for the stack component.

post_registration_message: Optional[str] property readonly

Optional message that will be printed after the stack component is registered.

Returns:

Type Description
Optional[str]

An optional message.

required_secrets: Set[zenml.utils.secret_utils.SecretReference] property readonly

All required secrets for this stack component.

Returns:

Type Description
Set[zenml.utils.secret_utils.SecretReference]

The required secrets of this stack component.

requirements: Set[str] property readonly

Set of PyPI requirements for the component.

Returns:

Type Description
Set[str]

A set of PyPI requirements for the component.

runtime_options: Dict[str, Any] property readonly

Runtime options that are available to configure this component.

The items of the dictionary should map option names (which can be used to configure the option in the RuntimeConfiguration) to default values for the option (or None if there is no default value).

Returns:

Type Description
Dict[str, Any]

A dictionary of runtime options.

validator: Optional[StackValidator] property readonly

The optional validator of the stack component.

This validator will be called each time a stack with the stack component is initialized. Subclasses should override this property and return a StackValidator that makes sure they're not included in any stack that they're not compatible with.

Returns:

Type Description
Optional[StackValidator]

An optional StackValidator instance.

Config

Pydantic configuration class.

Source code in zenml/stack/stack_component.py
class Config:
    """Pydantic configuration class."""

    # public attributes are immutable
    allow_mutation = False
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
    # prevent extra attributes during model initialization
    extra = Extra.forbid
__custom_getattribute__(self, key) special

Returns the (potentially resolved) attribute value for the given key.

An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.

Parameters:

Name Type Description Default
key str

The key for which to get the attribute value.

required

Exceptions:

Type Description
RuntimeError

If the stack component is not part of the active stack, or the active stack is missing a secrets manager.

KeyError

If the secret or secret key don't exist.

Returns:

Type Description
Any

The (potentially resolved) attribute value.

Source code in zenml/stack/stack_component.py
def __custom_getattribute__(self, key: str) -> Any:
    """Returns the (potentially resolved) attribute value for the given key.

    An attribute value may be either specified directly, or as a secret
    reference. In case of a secret reference, this method resolves the
    reference and returns the secret value instead.

    Args:
        key: The key for which to get the attribute value.

    Raises:
        RuntimeError: If the stack component is not part of the active
            stack, or the active stack is missing a secrets manager.
        KeyError: If the secret or secret key don't exist.

    Returns:
        The (potentially resolved) attribute value.
    """
    value = super().__getattribute__(key)

    if not secret_utils.is_secret_reference(value):
        return value

    from zenml.repository import Repository

    stack = Repository().active_stack

    # A stack component can be part of many stacks, and currently a
    # secrets manager is associated with a stack. This means we're
    # not able to identify the 'correct' secrets manager that the user
    # wanted to resolve the secrets in a general way. We therefore
    # limit secret resolving to components of the active stack.
    component = stack.components.get(self.TYPE, None)
    if not component or component.uuid != self.uuid:
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The stack component is not "
            "part of the active stack and therefore can't have it's "
            "secret references resolved. If you want to access attributes "
            "of this stack component which reference secrets, set a stack "
            "which includes both this component and a secrets manager as "
            "your active stack: `zenml stack set <STACK_NAME>`."
        )

    secrets_manager = Repository().active_stack.secrets_manager
    if not secrets_manager:
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The active stack does not "
            "have a secrets manager."
        )

    secret_ref = secret_utils.parse_secret_reference(value)
    try:
        secret = secrets_manager.get_secret(secret_ref.name)
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The secret "
            f"{secret_ref.name} does not exist."
        )

    try:
        secret_value = secret.content[secret_ref.key]
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The secret "
            f"{secret_ref.name} does not contain a value for key "
            f"{secret_ref.key}. Available keys: {set(secret.content)}."
        )

    return str(secret_value)
__getattribute__(self, key) special

Returns the (potentially resolved) attribute value for the given key.

An attribute value may be either specified directly, or as a secret reference. In case of a secret reference, this method resolves the reference and returns the secret value instead.

Parameters:

Name Type Description Default
key str

The key for which to get the attribute value.

required

Exceptions:

Type Description
RuntimeError

If the stack component is not part of the active stack, or the active stack is missing a secrets manager.

KeyError

If the secret or secret key don't exist.

Returns:

Type Description
Any

The (potentially resolved) attribute value.

Source code in zenml/stack/stack_component.py
def __custom_getattribute__(self, key: str) -> Any:
    """Returns the (potentially resolved) attribute value for the given key.

    An attribute value may be either specified directly, or as a secret
    reference. In case of a secret reference, this method resolves the
    reference and returns the secret value instead.

    Args:
        key: The key for which to get the attribute value.

    Raises:
        RuntimeError: If the stack component is not part of the active
            stack, or the active stack is missing a secrets manager.
        KeyError: If the secret or secret key don't exist.

    Returns:
        The (potentially resolved) attribute value.
    """
    value = super().__getattribute__(key)

    if not secret_utils.is_secret_reference(value):
        return value

    from zenml.repository import Repository

    stack = Repository().active_stack

    # A stack component can be part of many stacks, and currently a
    # secrets manager is associated with a stack. This means we're
    # not able to identify the 'correct' secrets manager that the user
    # wanted to resolve the secrets in a general way. We therefore
    # limit secret resolving to components of the active stack.
    component = stack.components.get(self.TYPE, None)
    if not component or component.uuid != self.uuid:
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The stack component is not "
            "part of the active stack and therefore can't have it's "
            "secret references resolved. If you want to access attributes "
            "of this stack component which reference secrets, set a stack "
            "which includes both this component and a secrets manager as "
            "your active stack: `zenml stack set <STACK_NAME>`."
        )

    secrets_manager = Repository().active_stack.secrets_manager
    if not secrets_manager:
        raise RuntimeError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The active stack does not "
            "have a secrets manager."
        )

    secret_ref = secret_utils.parse_secret_reference(value)
    try:
        secret = secrets_manager.get_secret(secret_ref.name)
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The secret "
            f"{secret_ref.name} does not exist."
        )

    try:
        secret_value = secret.content[secret_ref.key]
    except KeyError:
        raise KeyError(
            f"Failed to resolve secret reference for attribute {key} "
            f"of stack component `{self}`: The secret "
            f"{secret_ref.name} does not contain a value for key "
            f"{secret_ref.key}. Available keys: {set(secret.content)}."
        )

    return str(secret_value)
__init__(self, **kwargs) special

Ensures that secret references don't clash with pydantic validation.

StackComponents allow the specification of all their string attributes using secret references of the form {{secret_name.key}}. This however is only possible when the stack component does not perform any explicit validation of this attribute using pydantic validators. If this were the case, the validation would run on the secret reference and would fail or in the worst case, modify the secret reference and lead to unexpected behavior. This method ensures that no attributes that require custom pydantic validation are set as secret references.

Parameters:

Name Type Description Default
**kwargs Any

Arguments to initialize this stack component.

{}

Exceptions:

Type Description
ValueError

If an attribute that requires custom pydantic validation is passed as a secret reference, or if the name attribute was passed as a secret reference.

Source code in zenml/stack/stack_component.py
def __init__(self, **kwargs: Any) -> None:
    """Ensures that secret references don't clash with pydantic validation.

    StackComponents allow the specification of all their string attributes
    using secret references of the form `{{secret_name.key}}`. This however
    is only possible when the stack component does not perform any explicit
    validation of this attribute using pydantic validators. If this were
    the case, the validation would run on the secret reference and would
    fail or in the worst case, modify the secret reference and lead to
    unexpected behavior. This method ensures that no attributes that require
    custom pydantic validation are set as secret references.

    Args:
        **kwargs: Arguments to initialize this stack component.

    Raises:
        ValueError: If an attribute that requires custom pydantic validation
            is passed as a secret reference, or if the `name` attribute
            was passed as a secret reference.
    """
    for key, value in kwargs.items():
        try:
            field = self.__class__.__fields__[key]
        except KeyError:
            # Value for a private attribute or non-existing field, this
            # will fail during the upcoming pydantic validation
            continue

        if value is None:
            continue

        if not secret_utils.is_secret_reference(value):
            if secret_utils.is_secret_field(field):
                logger.warning(
                    "You specified a plain-text value for the sensitive "
                    f"attribute `{key}` for a `{self.__class__.__name__}` "
                    "stack component. This is currently only a warning, "
                    "but future versions of ZenML will require you to pass "
                    "in senstive information as secrets. Check out the "
                    "documentation on how to configure your stack "
                    "components with secrets here: "
                    "https://docs.zenml.io/developer-guide/advanced-usage/secret-references"
                )
            continue

        if key == "name":
            raise ValueError(
                "Passing the `name` attribute of a stack component as a "
                "secret reference is not allowed."
            )

        requires_validation = field.pre_validators or field.post_validators
        if requires_validation:
            raise ValueError(
                f"Passing the stack component attribute `{key}` as a "
                "secret reference is not allowed as additional validation "
                "is required for this attribute."
            )

    super().__init__(**kwargs)
__repr__(self) special

String representation of the stack component.

Returns:

Type Description
str

A string representation of the stack component.

Source code in zenml/stack/stack_component.py
def __repr__(self) -> str:
    """String representation of the stack component.

    Returns:
        A string representation of the stack component.
    """
    attribute_representation = ", ".join(
        f"{key}={value}" for key, value in self.dict().items()
    )
    return (
        f"{self.__class__.__qualname__}(type={self.TYPE}, "
        f"flavor={self.FLAVOR}, {attribute_representation})"
    )
__str__(self) special

String representation of the stack component.

Returns:

Type Description
str

A string representation of the stack component.

Source code in zenml/stack/stack_component.py
def __str__(self) -> str:
    """String representation of the stack component.

    Returns:
        A string representation of the stack component.
    """
    return self.__repr__()
cleanup_pipeline_run(self)

Cleans up resources after the pipeline run is finished.

Source code in zenml/stack/stack_component.py
def cleanup_pipeline_run(self) -> None:
    """Cleans up resources after the pipeline run is finished."""
cleanup_step_run(self)

Cleans up resources after the step run is finished.

Source code in zenml/stack/stack_component.py
def cleanup_step_run(self) -> None:
    """Cleans up resources after the step run is finished."""
deprovision(self)

Deprovisions all resources of the component.

Exceptions:

Type Description
NotImplementedError

If the component does not implement this method.

Source code in zenml/stack/stack_component.py
def deprovision(self) -> None:
    """Deprovisions all resources of the component.

    Raises:
        NotImplementedError: If the component does not implement this
            method.
    """
    raise NotImplementedError(
        f"Deprovisioning resource not implemented for {self}."
    )
prepare_pipeline_deployment(self, pipeline, stack, runtime_configuration)

Prepares deploying the pipeline.

This method gets called immediately before a pipeline is deployed. Subclasses should override it if they require runtime configuration options or if they need to run code before the pipeline deployment.

Parameters:

Name Type Description Default
pipeline BasePipeline

The pipeline that will be deployed.

required
stack Stack

The stack on which the pipeline will be deployed.

required
runtime_configuration RuntimeConfiguration

Contains all the runtime configuration options specified for the pipeline run.

required
Source code in zenml/stack/stack_component.py
def prepare_pipeline_deployment(
    self,
    pipeline: "BasePipeline",
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> None:
    """Prepares deploying the pipeline.

    This method gets called immediately before a pipeline is deployed.
    Subclasses should override it if they require runtime configuration
    options or if they need to run code before the pipeline deployment.

    Args:
        pipeline: The pipeline that will be deployed.
        stack: The stack on which the pipeline will be deployed.
        runtime_configuration: Contains all the runtime configuration
            options specified for the pipeline run.
    """
prepare_pipeline_run(self)

Prepares running the pipeline.

Source code in zenml/stack/stack_component.py
def prepare_pipeline_run(self) -> None:
    """Prepares running the pipeline."""
prepare_step_run(self)

Prepares running a step.

Source code in zenml/stack/stack_component.py
def prepare_step_run(self) -> None:
    """Prepares running a step."""
provision(self)

Provisions resources to run the component.

Exceptions:

Type Description
NotImplementedError

If the component does not implement this method.

Source code in zenml/stack/stack_component.py
def provision(self) -> None:
    """Provisions resources to run the component.

    Raises:
        NotImplementedError: If the component does not implement this
            method.
    """
    raise NotImplementedError(
        f"Provisioning resources not implemented for {self}."
    )
resume(self)

Resumes the provisioned resources of the component.

Exceptions:

Type Description
NotImplementedError

If the component does not implement this method.

Source code in zenml/stack/stack_component.py
def resume(self) -> None:
    """Resumes the provisioned resources of the component.

    Raises:
        NotImplementedError: If the component does not implement this
            method.
    """
    raise NotImplementedError(
        f"Resuming provisioned resources not implemented for {self}."
    )
suspend(self)

Suspends the provisioned resources of the component.

Exceptions:

Type Description
NotImplementedError

If the component does not implement this method.

Source code in zenml/stack/stack_component.py
def suspend(self) -> None:
    """Suspends the provisioned resources of the component.

    Raises:
        NotImplementedError: If the component does not implement this
            method.
    """
    raise NotImplementedError(
        f"Suspending provisioned resources not implemented for {self}."
    )

uuid_factory()

Generates a UUID whose hex string does not start with a '0'.

Returns:

Type Description
UUID

A UUID whose hex string does not start with a '0'.

Source code in zenml/stack/stack_component.py
def uuid_factory() -> UUID:
    """Generates a UUID whose hex string does not start with a '0'.

    Returns:
        A UUID whose hex string does not start with a '0'.
    """
    # TODO [MEDIUM]: This is a replica of the fix which is applied
    #   to the zen_store.sql_zen_store.SQLZenStore. Since the UUID for
    #   stack components get created upon the creation of the
    #   pydantic instance, the same logic must apply here in order to
    #   save it within the ZenStore.
    # SQLModel crashes when a UUID hex string starts with '0'
    # (see: https://github.com/tiangolo/sqlmodel/issues/25)
    uuid = uuid4()
    while uuid.hex[0] == "0":
        uuid = uuid4()
    return uuid

stack_validator

Implementation of the ZenML Stack Validator.

StackValidator

A StackValidator is used to validate a stack configuration.

Each StackComponent can provide a StackValidator to make sure it is compatible with all components of the stack. The KubeflowOrchestrator for example will always require the stack to have a container registry in order to push the docker images that are required to run a pipeline in Kubeflow Pipelines.

Source code in zenml/stack/stack_validator.py
class StackValidator:
    """A `StackValidator` is used to validate a stack configuration.

    Each `StackComponent` can provide a `StackValidator` to make sure it is
    compatible with all components of the stack. The `KubeflowOrchestrator`
    for example will always require the stack to have a container registry
    in order to push the docker images that are required to run a pipeline
    in Kubeflow Pipelines.
    """

    def __init__(
        self,
        required_components: Optional[AbstractSet[StackComponentType]] = None,
        custom_validation_function: Optional[
            Callable[["Stack"], Tuple[bool, str]]
        ] = None,
    ):
        """Initializes a `StackValidator` instance.

        Args:
            required_components: Optional set of stack components that must
                exist in the stack.
            custom_validation_function: Optional function that returns whether
                a stack is valid and an error message to show if not valid.
        """
        self._required_components = required_components or set()
        self._custom_validation_function = custom_validation_function

    def validate(self, stack: "Stack") -> None:
        """Validates the given stack.

        Checks if the stack contains all the required components and passes
        the custom validation function of the validator.

        Args:
            stack: The stack to validate.

        Raises:
            StackValidationError: If the stack does not meet all the
                validation criteria.
        """
        missing_components = self._required_components - set(stack.components)
        if missing_components:
            raise StackValidationError(
                f"Missing stack components {missing_components} for "
                f"stack: {stack.name}"
            )

        if self._custom_validation_function:
            valid, err_msg = self._custom_validation_function(stack)
            if not valid:
                raise StackValidationError(
                    f"Custom validation function failed to validate "
                    f"stack '{stack.name}': {err_msg}"
                )
__init__(self, required_components=None, custom_validation_function=None) special

Initializes a StackValidator instance.

Parameters:

Name Type Description Default
required_components Optional[AbstractSet[zenml.enums.StackComponentType]]

Optional set of stack components that must exist in the stack.

None
custom_validation_function Optional[Callable[[Stack], Tuple[bool, str]]]

Optional function that returns whether a stack is valid and an error message to show if not valid.

None
Source code in zenml/stack/stack_validator.py
def __init__(
    self,
    required_components: Optional[AbstractSet[StackComponentType]] = None,
    custom_validation_function: Optional[
        Callable[["Stack"], Tuple[bool, str]]
    ] = None,
):
    """Initializes a `StackValidator` instance.

    Args:
        required_components: Optional set of stack components that must
            exist in the stack.
        custom_validation_function: Optional function that returns whether
            a stack is valid and an error message to show if not valid.
    """
    self._required_components = required_components or set()
    self._custom_validation_function = custom_validation_function
validate(self, stack)

Validates the given stack.

Checks if the stack contains all the required components and passes the custom validation function of the validator.

Parameters:

Name Type Description Default
stack Stack

The stack to validate.

required

Exceptions:

Type Description
StackValidationError

If the stack does not meet all the validation criteria.

Source code in zenml/stack/stack_validator.py
def validate(self, stack: "Stack") -> None:
    """Validates the given stack.

    Checks if the stack contains all the required components and passes
    the custom validation function of the validator.

    Args:
        stack: The stack to validate.

    Raises:
        StackValidationError: If the stack does not meet all the
            validation criteria.
    """
    missing_components = self._required_components - set(stack.components)
    if missing_components:
        raise StackValidationError(
            f"Missing stack components {missing_components} for "
            f"stack: {stack.name}"
        )

    if self._custom_validation_function:
        valid, err_msg = self._custom_validation_function(stack)
        if not valid:
            raise StackValidationError(
                f"Custom validation function failed to validate "
                f"stack '{stack.name}': {err_msg}"
            )