Secrets Managers
zenml.secrets_managers
special
Secret Manager
...
base_secrets_manager
BaseSecretsManager (StackComponent, ABC)
pydantic-model
Base class for all ZenML secrets managers.
Source code in zenml/secrets_managers/base_secrets_manager.py
class BaseSecretsManager(StackComponent, ABC):
"""Base class for all ZenML secrets managers."""
# Class configuration
TYPE: ClassVar[StackComponentType] = StackComponentType.SECRETS_MANAGER
FLAVOR: ClassVar[str]
@abstractmethod
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
"""
@abstractmethod
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets the value of a secret.
Args:
secret_name: The name of the secret to get.
"""
@abstractmethod
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys."""
@abstractmethod
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: The secret to update.
"""
@abstractmethod
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
"""
@abstractmethod
def delete_all_secrets(self, force: bool = False) -> None:
"""Delete all existing secrets.
Args:
force: Whether to force deletion of secrets.
"""
delete_all_secrets(self, force=False)
Delete all existing secrets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
Whether to force deletion of secrets. |
False |
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def delete_all_secrets(self, force: bool = False) -> None:
"""Delete all existing secrets.
Args:
force: Whether to force deletion of secrets.
"""
delete_secret(self, secret_name)
Delete an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to delete. |
required |
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
"""
get_all_secret_keys(self)
Get all secret keys.
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys."""
get_secret(self, secret_name)
Gets the value of a secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to get. |
required |
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets the value of a secret.
Args:
secret_name: The name of the secret to get.
"""
register_secret(self, secret)
Registers a new secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to register. |
required |
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
"""
update_secret(self, secret)
Update an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to update. |
required |
Source code in zenml/secrets_managers/base_secrets_manager.py
@abstractmethod
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: The secret to update.
"""
local
special
local_secrets_manager
LocalSecretsManager (BaseSecretsManager)
pydantic-model
Class for ZenML local file-based secret manager.
Source code in zenml/secrets_managers/local/local_secrets_manager.py
class LocalSecretsManager(BaseSecretsManager):
"""Class for ZenML local file-based secret manager."""
secrets_file: str = ""
# Class configuration
FLAVOR: ClassVar[str] = "local"
@root_validator
def set_secrets_file(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Sets the secrets_file attribute value according to the component
UUID."""
if values.get("secrets_file"):
return values
# not likely to happen, due to Pydantic validation, but mypy complains
assert "uuid" in values
values["secrets_file"] = cls.get_secret_store_path(values["uuid"])
return values
@staticmethod
def get_secret_store_path(uuid: uuid.UUID) -> str:
"""Get the path to the secret store.
Args:
uuid: The UUID of the secret store.
Returns:
The path to the secret store."""
return os.path.join(
get_global_config_directory(),
LOCAL_STORES_DIRECTORY_NAME,
str(uuid),
LOCAL_SECRETS_FILENAME,
)
@property
def local_path(self) -> str:
"""Path to the local directory where the secrets are stored."""
return str(Path(self.secrets_file).parent)
def _create_secrets_file__if_not_exists(self) -> None:
"""Makes sure the secrets yaml file exists"""
create_file_if_not_exists(self.secrets_file)
def _verify_secret_key_exists(self, secret_name: str) -> bool:
"""Checks if a secret key exists.
Args:
secret_name: The name of the secret key.
Returns:
True if the secret key exists, False otherwise."""
self._create_secrets_file__if_not_exists()
secrets_store_items = yaml_utils.read_yaml(self.secrets_file)
try:
return secret_name in secrets_store_items
except TypeError:
return False
def _get_all_secrets(self) -> Dict[str, Dict[str, str]]:
self._create_secrets_file__if_not_exists()
return yaml_utils.read_yaml(self.secrets_file) or {}
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
Raises:
KeyError: If the secret already exists."""
self._create_secrets_file__if_not_exists()
if self._verify_secret_key_exists(secret_name=secret.name):
raise KeyError(f"Secret `{secret.name}` already exists.")
encoded_secret = encode_secret(secret)
secrets_store_items = self._get_all_secrets()
secrets_store_items[secret.name] = encoded_secret
yaml_utils.append_yaml(self.secrets_file, secrets_store_items)
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets a specific secret.
Args:
secret_name: The name of the secret.
Returns:
The secret.
Raises:
KeyError: If the secret does not exist."""
self._create_secrets_file__if_not_exists()
secret_store_items = self._get_all_secrets()
if not self._verify_secret_key_exists(secret_name=secret_name):
raise KeyError(f"Secret `{secret_name}` does not exists.")
secret_dict = secret_store_items[secret_name]
decoded_secret_dict, zenml_schema_name = decode_secret_dict(secret_dict)
decoded_secret_dict["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**decoded_secret_dict)
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys."""
self._create_secrets_file__if_not_exists()
secrets_store_items = self._get_all_secrets()
return list(secrets_store_items.keys())
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: The secret to update.
Raises:
KeyError: If the secret does not exist."""
self._create_secrets_file__if_not_exists()
if not self._verify_secret_key_exists(secret_name=secret.name):
raise KeyError(f"Secret `{secret.name}` did not exist.")
encoded_secret = encode_secret(secret)
secrets_store_items = self._get_all_secrets()
secrets_store_items[secret.name] = encoded_secret
yaml_utils.append_yaml(self.secrets_file, secrets_store_items)
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
Raises:
KeyError: If the secret does not exist."""
self._create_secrets_file__if_not_exists()
if not self._verify_secret_key_exists(secret_name=secret_name):
raise KeyError(f"Secret `{secret_name}` does not exists.")
secrets_store_items = self._get_all_secrets()
try:
secrets_store_items.pop(secret_name)
yaml_utils.write_yaml(self.secrets_file, secrets_store_items)
except KeyError:
error(f"Secret {secret_name} does not exist.")
def delete_all_secrets(self, force: bool = False) -> None:
"""Delete all existing secrets.
Args:
force: If True, delete all secrets.
Raises:
ValueError: If force is False."""
self._create_secrets_file__if_not_exists()
if not force:
raise ValueError(
"This operation will delete all secrets. "
"To confirm, please pass `--force`."
)
remove(self.secrets_file)
local_path: str
property
readonly
Path to the local directory where the secrets are stored.
delete_all_secrets(self, force=False)
Delete all existing secrets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
If True, delete all secrets. |
False |
Exceptions:
Type | Description |
---|---|
ValueError |
If force is False. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def delete_all_secrets(self, force: bool = False) -> None:
"""Delete all existing secrets.
Args:
force: If True, delete all secrets.
Raises:
ValueError: If force is False."""
self._create_secrets_file__if_not_exists()
if not force:
raise ValueError(
"This operation will delete all secrets. "
"To confirm, please pass `--force`."
)
remove(self.secrets_file)
delete_secret(self, secret_name)
Delete an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to delete. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If the secret does not exist. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
Raises:
KeyError: If the secret does not exist."""
self._create_secrets_file__if_not_exists()
if not self._verify_secret_key_exists(secret_name=secret_name):
raise KeyError(f"Secret `{secret_name}` does not exists.")
secrets_store_items = self._get_all_secrets()
try:
secrets_store_items.pop(secret_name)
yaml_utils.write_yaml(self.secrets_file, secrets_store_items)
except KeyError:
error(f"Secret {secret_name} does not exist.")
get_all_secret_keys(self)
Get all secret keys.
Returns:
Type | Description |
---|---|
List[str] |
A list of all secret keys. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys."""
self._create_secrets_file__if_not_exists()
secrets_store_items = self._get_all_secrets()
return list(secrets_store_items.keys())
get_secret(self, secret_name)
Gets a specific secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret. |
required |
Returns:
Type | Description |
---|---|
BaseSecretSchema |
The secret. |
Exceptions:
Type | Description |
---|---|
KeyError |
If the secret does not exist. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets a specific secret.
Args:
secret_name: The name of the secret.
Returns:
The secret.
Raises:
KeyError: If the secret does not exist."""
self._create_secrets_file__if_not_exists()
secret_store_items = self._get_all_secrets()
if not self._verify_secret_key_exists(secret_name=secret_name):
raise KeyError(f"Secret `{secret_name}` does not exists.")
secret_dict = secret_store_items[secret_name]
decoded_secret_dict, zenml_schema_name = decode_secret_dict(secret_dict)
decoded_secret_dict["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**decoded_secret_dict)
get_secret_store_path(uuid)
staticmethod
Get the path to the secret store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
The UUID of the secret store. |
required |
Returns:
Type | Description |
---|---|
str |
The path to the secret store. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
@staticmethod
def get_secret_store_path(uuid: uuid.UUID) -> str:
"""Get the path to the secret store.
Args:
uuid: The UUID of the secret store.
Returns:
The path to the secret store."""
return os.path.join(
get_global_config_directory(),
LOCAL_STORES_DIRECTORY_NAME,
str(uuid),
LOCAL_SECRETS_FILENAME,
)
register_secret(self, secret)
Registers a new secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to register. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If the secret already exists. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
Raises:
KeyError: If the secret already exists."""
self._create_secrets_file__if_not_exists()
if self._verify_secret_key_exists(secret_name=secret.name):
raise KeyError(f"Secret `{secret.name}` already exists.")
encoded_secret = encode_secret(secret)
secrets_store_items = self._get_all_secrets()
secrets_store_items[secret.name] = encoded_secret
yaml_utils.append_yaml(self.secrets_file, secrets_store_items)
set_secrets_file(values)
classmethod
Sets the secrets_file attribute value according to the component UUID.
Source code in zenml/secrets_managers/local/local_secrets_manager.py
@root_validator
def set_secrets_file(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Sets the secrets_file attribute value according to the component
UUID."""
if values.get("secrets_file"):
return values
# not likely to happen, due to Pydantic validation, but mypy complains
assert "uuid" in values
values["secrets_file"] = cls.get_secret_store_path(values["uuid"])
return values
update_secret(self, secret)
Update an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to update. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If the secret does not exist. |
Source code in zenml/secrets_managers/local/local_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: The secret to update.
Raises:
KeyError: If the secret does not exist."""
self._create_secrets_file__if_not_exists()
if not self._verify_secret_key_exists(secret_name=secret.name):
raise KeyError(f"Secret `{secret.name}` did not exist.")
encoded_secret = encode_secret(secret)
secrets_store_items = self._get_all_secrets()
secrets_store_items[secret.name] = encoded_secret
yaml_utils.append_yaml(self.secrets_file, secrets_store_items)