Skip to content

Vault

zenml.integrations.vault special

Initialization for the Vault Secrets Manager integration.

The Vault secrets manager integration submodule provides a way to access the HashiCorp Vault secrets manager from within your ZenML pipeline runs.

VaultSecretsManagerIntegration (Integration)

Definition of HashiCorp Vault integration with ZenML.

Source code in zenml/integrations/vault/__init__.py
class VaultSecretsManagerIntegration(Integration):
    """Definition of HashiCorp Vault integration with ZenML."""

    NAME = VAULT
    REQUIREMENTS = ["hvac>=0.11.2"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Vault integration.

        Returns:
            List of stack component flavors.
        """
        from zenml.integrations.vault.flavors import VaultSecretsManagerFlavor

        return [VaultSecretsManagerFlavor]

flavors() classmethod

Declare the stack component flavors for the Vault integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors.

Source code in zenml/integrations/vault/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Vault integration.

    Returns:
        List of stack component flavors.
    """
    from zenml.integrations.vault.flavors import VaultSecretsManagerFlavor

    return [VaultSecretsManagerFlavor]

flavors special

HashiCorp Vault integration flavors.

vault_secrets_manager_flavor

HashiCorp Vault secrets manager flavor.

VaultSecretsManagerConfig (BaseSecretsManagerConfig) pydantic-model

Configuration for the Vault Secrets Manager.

Attributes:

Name Type Description
url str

The url of the Vault server.

token str

The token to use to authenticate with Vault.

cert Optional[str]

The path to the certificate to use to authenticate with Vault.

verify Optional[str]

Whether to verify the certificate or not.

mount_point str

The mount point of the secrets manager.

Source code in zenml/integrations/vault/flavors/vault_secrets_manager_flavor.py
class VaultSecretsManagerConfig(BaseSecretsManagerConfig):
    """Configuration for the Vault Secrets Manager.

    Attributes:
        url: The url of the Vault server.
        token: The token to use to authenticate with Vault.
        cert: The path to the certificate to use to authenticate with Vault.
        verify: Whether to verify the certificate or not.
        mount_point: The mount point of the secrets manager.
    """

    SUPPORTS_SCOPING: ClassVar[bool] = True

    url: str
    token: str
    mount_point: str
    cert: Optional[str]  # TODO: unused
    verify: Optional[str]  # TODO: unused

    @classmethod
    def _validate_scope(
        cls,
        scope: SecretsManagerScope,
        namespace: Optional[str],
    ) -> None:
        """Validate the scope and namespace value.

        Args:
            scope: Scope value.
            namespace: Optional namespace value.
        """
        if namespace:
            validate_vault_secret_name_or_namespace(namespace)
VaultSecretsManagerFlavor (BaseSecretsManagerFlavor)

Class for the VaultSecretsManagerFlavor.

Source code in zenml/integrations/vault/flavors/vault_secrets_manager_flavor.py
class VaultSecretsManagerFlavor(BaseSecretsManagerFlavor):
    """Class for the `VaultSecretsManagerFlavor`."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return VAULT_SECRETS_MANAGER_FLAVOR

    @property
    def config_class(self) -> Type[VaultSecretsManagerConfig]:
        """Returns `VaultSecretsManagerConfig` config class.

        Returns:
                The config class.
        """
        return VaultSecretsManagerConfig

    @property
    def implementation_class(self) -> Type["VaultSecretsManager"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.vault.secrets_manager import VaultSecretsManager

        return VaultSecretsManager
config_class: Type[zenml.integrations.vault.flavors.vault_secrets_manager_flavor.VaultSecretsManagerConfig] property readonly

Returns VaultSecretsManagerConfig config class.

Returns:

Type Description
Type[zenml.integrations.vault.flavors.vault_secrets_manager_flavor.VaultSecretsManagerConfig]

The config class.

implementation_class: Type[VaultSecretsManager] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[VaultSecretsManager]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

validate_vault_secret_name_or_namespace(name)

Validate a secret name or namespace.

For compatibility across secret managers the secret names should contain only alphanumeric characters and the characters /_+=.@-. The / character is only used internally to delimit scopes.

Parameters:

Name Type Description Default
name str

the secret name or namespace

required

Exceptions:

Type Description
ValueError

if the secret name or namespace is invalid

Source code in zenml/integrations/vault/flavors/vault_secrets_manager_flavor.py
def validate_vault_secret_name_or_namespace(name: str) -> None:
    """Validate a secret name or namespace.

    For compatibility across secret managers the secret names should contain
    only alphanumeric characters and the characters /_+=.@-. The `/`
    character is only used internally to delimit scopes.

    Args:
        name: the secret name or namespace

    Raises:
        ValueError: if the secret name or namespace is invalid
    """
    if not re.fullmatch(r"[a-zA-Z0-9_+=\.@\-]*", name):
        raise ValueError(
            f"Invalid secret name or namespace '{name}'. Must contain "
            f"only alphanumeric characters and the characters _+=.@-."
        )

secrets_manager special

HashiCorp Vault Secrets Manager.

vault_secrets_manager

Implementation of the HashiCorp Vault Secrets Manager integration.

VaultSecretsManager (BaseSecretsManager)

Class to interact with the Vault secrets manager - Key/value Engine.

Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
class VaultSecretsManager(BaseSecretsManager):
    """Class to interact with the Vault secrets manager - Key/value Engine."""

    CLIENT: ClassVar[Any] = None

    @property
    def config(self) -> VaultSecretsManagerConfig:
        """Returns the `VaultSecretsManagerConfig` config.

        Returns:
            The configuration.
        """
        return cast(VaultSecretsManagerConfig, self._config)

    @classmethod
    def _ensure_client_connected(cls, url: str, token: str) -> None:
        """Ensure the client is connected.

        This function initializes the client if it is not initialized.

        Args:
            url: The url of the Vault server.
            token: The token to use to authenticate with Vault.
        """
        if cls.CLIENT is None:
            # Create a Vault Secrets Manager client
            cls.CLIENT = hvac.Client(
                url=url,
                token=token,
            )

    def _ensure_client_is_authenticated(self) -> None:
        """Ensure the client is authenticated.

        Raises:
            RuntimeError: If the client is not initialized or authenticated.
        """
        self._ensure_client_connected(
            url=self.config.url, token=self.config.token
        )

        if not self.CLIENT.is_authenticated():
            raise RuntimeError(
                "There was an error authenticating with Vault. Please check "
                "your configuration."
            )
        else:
            pass

    def register_secret(self, secret: BaseSecretSchema) -> None:
        """Registers a new secret.

        Args:
            secret: The secret to register.

        Raises:
            SecretExistsError: If the secret already exists.
        """
        self._ensure_client_is_authenticated()

        validate_vault_secret_name_or_namespace(secret.name)

        try:
            self.get_secret(secret.name)
            raise SecretExistsError(
                f"A Secret with the name '{secret.name}' already " f"exists."
            )
        except KeyError:
            pass

        secret_path = self._get_scoped_secret_name(secret.name)
        secret_value = secret_to_dict(secret, encode=False)

        self.CLIENT.secrets.kv.v2.create_or_update_secret(
            path=secret_path,
            mount_point=self.config.mount_point,
            secret=secret_value,
        )

        logger.info("Created secret: %s", f"{secret_path}")
        logger.info("Added value to secret.")

    def get_secret(self, secret_name: str) -> BaseSecretSchema:
        """Gets the value of a secret.

        Args:
            secret_name: The name of the secret to get.

        Returns:
            The secret.

        Raises:
            KeyError: If the secret does not exist.
        """
        self._ensure_client_is_authenticated()

        secret_path = self._get_scoped_secret_name(secret_name)

        try:
            secret_items = (
                self.CLIENT.secrets.kv.v2.read_secret_version(
                    path=secret_path,
                    mount_point=self.config.mount_point,
                )
                .get("data", {})
                .get("data", {})
            )
        except InvalidPath as e:
            raise KeyError(e)

        zenml_schema_name = secret_items.pop(ZENML_SCHEMA_NAME)

        secret_schema = SecretSchemaClassRegistry.get_class(
            secret_schema=zenml_schema_name
        )
        secret_items["name"] = secret_name
        return secret_schema(**secret_items)

    def get_all_secret_keys(self) -> List[str]:
        """List all secrets in Vault without any reformatting.

        This function tries to get all secrets from Vault and returns
        them as a list of strings (all secrets' names).

        Returns:
            A list of all secrets in the secrets manager.
        """
        self._ensure_client_is_authenticated()

        set_of_secrets: Set[str] = set()
        secret_path = "/".join(self._get_scope_path())
        try:
            secrets = self.CLIENT.secrets.kv.v2.list_secrets(
                path=secret_path, mount_point=self.config.mount_point
            )
        except hvac.exceptions.InvalidPath:
            logger.error(
                f"There are no secrets created within the scope `{secret_path}`"
            )
            return list(set_of_secrets)

        secrets_keys = secrets.get("data", {}).get("keys", [])
        for secret_key in secrets_keys:
            # vault scopes end with / and are not themselves secrets
            if "/" not in secret_key:
                set_of_secrets.add(secret_key)
        return list(set_of_secrets)

    def update_secret(self, secret: BaseSecretSchema) -> None:
        """Update an existing secret.

        Args:
            secret: The secret to update.

        Raises:
            KeyError: If the secret does not exist.
        """
        self._ensure_client_is_authenticated()

        validate_vault_secret_name_or_namespace(secret.name)

        if secret.name in self.get_all_secret_keys():
            secret_path = self._get_scoped_secret_name(secret.name)
            secret_value = secret_to_dict(secret, encode=False)

            self.CLIENT.secrets.kv.v2.create_or_update_secret(
                path=secret_path,
                mount_point=self.config.mount_point,
                secret=secret_value,
            )
        else:
            raise KeyError(
                f"A Secret with the name '{secret.name}'" f" does not exist."
            )

        logger.info("Updated secret: %s", secret_path)
        logger.info("Added value to secret.")

    def delete_secret(self, secret_name: str) -> None:
        """Delete an existing secret.

        Args:
            secret_name: The name of the secret to delete.
        """
        self._ensure_client_is_authenticated()

        secret_path = self._get_scoped_secret_name(secret_name)

        self.CLIENT.secrets.kv.v2.delete_metadata_and_all_versions(
            path=secret_path,
            mount_point=self.config.mount_point,
        )

        logger.info("Deleted secret: %s", f"{secret_path}")

    def delete_all_secrets(self) -> None:
        """Delete all existing secrets."""
        self._ensure_client_is_authenticated()

        for secret_name in self.get_all_secret_keys():
            self.delete_secret(secret_name)

        logger.info("Deleted all secrets.")
config: VaultSecretsManagerConfig property readonly

Returns the VaultSecretsManagerConfig config.

Returns:

Type Description
VaultSecretsManagerConfig

The configuration.

delete_all_secrets(self)

Delete all existing secrets.

Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def delete_all_secrets(self) -> None:
    """Delete all existing secrets."""
    self._ensure_client_is_authenticated()

    for secret_name in self.get_all_secret_keys():
        self.delete_secret(secret_name)

    logger.info("Deleted all secrets.")
delete_secret(self, secret_name)

Delete an existing secret.

Parameters:

Name Type Description Default
secret_name str

The name of the secret to delete.

required
Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
    """Delete an existing secret.

    Args:
        secret_name: The name of the secret to delete.
    """
    self._ensure_client_is_authenticated()

    secret_path = self._get_scoped_secret_name(secret_name)

    self.CLIENT.secrets.kv.v2.delete_metadata_and_all_versions(
        path=secret_path,
        mount_point=self.config.mount_point,
    )

    logger.info("Deleted secret: %s", f"{secret_path}")
get_all_secret_keys(self)

List all secrets in Vault without any reformatting.

This function tries to get all secrets from Vault and returns them as a list of strings (all secrets' names).

Returns:

Type Description
List[str]

A list of all secrets in the secrets manager.

Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
    """List all secrets in Vault without any reformatting.

    This function tries to get all secrets from Vault and returns
    them as a list of strings (all secrets' names).

    Returns:
        A list of all secrets in the secrets manager.
    """
    self._ensure_client_is_authenticated()

    set_of_secrets: Set[str] = set()
    secret_path = "/".join(self._get_scope_path())
    try:
        secrets = self.CLIENT.secrets.kv.v2.list_secrets(
            path=secret_path, mount_point=self.config.mount_point
        )
    except hvac.exceptions.InvalidPath:
        logger.error(
            f"There are no secrets created within the scope `{secret_path}`"
        )
        return list(set_of_secrets)

    secrets_keys = secrets.get("data", {}).get("keys", [])
    for secret_key in secrets_keys:
        # vault scopes end with / and are not themselves secrets
        if "/" not in secret_key:
            set_of_secrets.add(secret_key)
    return list(set_of_secrets)
get_secret(self, secret_name)

Gets the value of a secret.

Parameters:

Name Type Description Default
secret_name str

The name of the secret to get.

required

Returns:

Type Description
BaseSecretSchema

The secret.

Exceptions:

Type Description
KeyError

If the secret does not exist.

Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
    """Gets the value of a secret.

    Args:
        secret_name: The name of the secret to get.

    Returns:
        The secret.

    Raises:
        KeyError: If the secret does not exist.
    """
    self._ensure_client_is_authenticated()

    secret_path = self._get_scoped_secret_name(secret_name)

    try:
        secret_items = (
            self.CLIENT.secrets.kv.v2.read_secret_version(
                path=secret_path,
                mount_point=self.config.mount_point,
            )
            .get("data", {})
            .get("data", {})
        )
    except InvalidPath as e:
        raise KeyError(e)

    zenml_schema_name = secret_items.pop(ZENML_SCHEMA_NAME)

    secret_schema = SecretSchemaClassRegistry.get_class(
        secret_schema=zenml_schema_name
    )
    secret_items["name"] = secret_name
    return secret_schema(**secret_items)
register_secret(self, secret)

Registers a new secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

The secret to register.

required

Exceptions:

Type Description
SecretExistsError

If the secret already exists.

Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
    """Registers a new secret.

    Args:
        secret: The secret to register.

    Raises:
        SecretExistsError: If the secret already exists.
    """
    self._ensure_client_is_authenticated()

    validate_vault_secret_name_or_namespace(secret.name)

    try:
        self.get_secret(secret.name)
        raise SecretExistsError(
            f"A Secret with the name '{secret.name}' already " f"exists."
        )
    except KeyError:
        pass

    secret_path = self._get_scoped_secret_name(secret.name)
    secret_value = secret_to_dict(secret, encode=False)

    self.CLIENT.secrets.kv.v2.create_or_update_secret(
        path=secret_path,
        mount_point=self.config.mount_point,
        secret=secret_value,
    )

    logger.info("Created secret: %s", f"{secret_path}")
    logger.info("Added value to secret.")
update_secret(self, secret)

Update an existing secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

The secret to update.

required

Exceptions:

Type Description
KeyError

If the secret does not exist.

Source code in zenml/integrations/vault/secrets_manager/vault_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
    """Update an existing secret.

    Args:
        secret: The secret to update.

    Raises:
        KeyError: If the secret does not exist.
    """
    self._ensure_client_is_authenticated()

    validate_vault_secret_name_or_namespace(secret.name)

    if secret.name in self.get_all_secret_keys():
        secret_path = self._get_scoped_secret_name(secret.name)
        secret_value = secret_to_dict(secret, encode=False)

        self.CLIENT.secrets.kv.v2.create_or_update_secret(
            path=secret_path,
            mount_point=self.config.mount_point,
            secret=secret_value,
        )
    else:
        raise KeyError(
            f"A Secret with the name '{secret.name}'" f" does not exist."
        )

    logger.info("Updated secret: %s", secret_path)
    logger.info("Added value to secret.")