Azure
zenml.integrations.azure
special
Initialization of the ZenML Azure integration.
The Azure integration submodule provides a way to run ZenML pipelines in a cloud
environment. Specifically, it allows the use of cloud artifact stores,
and an io
module to handle file operations on Azure Blob Storage.
The Azure Step Operator integration submodule provides a way to run ZenML steps
in AzureML.
AzureIntegration (Integration)
Definition of Azure integration for ZenML.
Source code in zenml/integrations/azure/__init__.py
class AzureIntegration(Integration):
"""Definition of Azure integration for ZenML."""
NAME = AZURE
REQUIREMENTS = [
"adlfs==2021.10.0",
"azure-keyvault-keys",
"azure-keyvault-secrets",
"azure-identity==1.10.0",
"azureml-core==1.42.0.post1",
]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declares the flavors for the integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.azure.flavors import (
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
AzureSecretsManagerFlavor,
)
return [
AzureArtifactStoreFlavor,
AzureSecretsManagerFlavor,
AzureMLStepOperatorFlavor,
]
flavors()
classmethod
Declares the flavors for the integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/azure/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declares the flavors for the integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.azure.flavors import (
AzureArtifactStoreFlavor,
AzureMLStepOperatorFlavor,
AzureSecretsManagerFlavor,
)
return [
AzureArtifactStoreFlavor,
AzureSecretsManagerFlavor,
AzureMLStepOperatorFlavor,
]
artifact_stores
special
Initialization of the Azure Artifact Store integration.
azure_artifact_store
Implementation of the Azure Artifact Store integration.
AzureArtifactStore (BaseArtifactStore, AuthenticationMixin)
Artifact Store for Microsoft Azure based artifacts.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
class AzureArtifactStore(BaseArtifactStore, AuthenticationMixin):
"""Artifact Store for Microsoft Azure based artifacts."""
_filesystem: Optional[adlfs.AzureBlobFileSystem] = None
@property
def config(self) -> AzureArtifactStoreConfig:
"""Returns the `AzureArtifactStoreConfig` config.
Returns:
The configuration.
"""
return cast(AzureArtifactStoreConfig, self._config)
@property
def filesystem(self) -> adlfs.AzureBlobFileSystem:
"""The adlfs filesystem to access this artifact store.
Returns:
The adlfs filesystem to access this artifact store.
"""
if not self._filesystem:
secret = self.get_authentication_secret(
expected_schema_type=AzureSecretSchema
)
credentials = secret.content if secret else {}
self._filesystem = adlfs.AzureBlobFileSystem(
**credentials,
anon=False,
use_listings_cache=False,
)
return self._filesystem
def _split_path(self, path: PathType) -> Tuple[str, str]:
"""Splits a path into the filesystem prefix and remainder.
Example:
```python
prefix, remainder = ZenAzure._split_path("az://my_container/test.txt")
print(prefix, remainder) # "az://" "my_container/test.txt"
```
Args:
path: The path to split.
Returns:
A tuple of the filesystem prefix and the remainder.
"""
path = convert_to_str(path)
prefix = ""
for potential_prefix in self.config.SUPPORTED_SCHEMES:
if path.startswith(potential_prefix):
prefix = potential_prefix
path = path[len(potential_prefix) :]
break
return prefix, path
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of files in the given directory.
"""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a dictionary returned by the Azure filesystem.
Args:
file_dict: A dictionary returned by the Azure filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path, exist_ok=True)
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path to remove.
"""
self.filesystem.rm_file(path=path)
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
Stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
config: AzureArtifactStoreConfig
property
readonly
Returns the AzureArtifactStoreConfig
config.
Returns:
Type | Description |
---|---|
AzureArtifactStoreConfig |
The configuration. |
filesystem: AzureBlobFileSystem
property
readonly
The adlfs filesystem to access this artifact store.
Returns:
Type | Description |
---|---|
AzureBlobFileSystem |
The adlfs filesystem to access this artifact store. |
copyfile(self, src, dst, overwrite=False)
Copy a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path to copy from. |
required |
dst |
Union[bytes, str] |
The path to copy to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
exists(self, path)
Check whether a path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path exists, False otherwise. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
glob(self, pattern)
Return all paths that match the given glob pattern.
The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern to match, see details above. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that match the given glob pattern. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
isdir(self, path)
Check whether a path is a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path is a directory, False otherwise. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
listdir(self, path)
Return a list of files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to list. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of files in the given directory. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of files in the given directory.
"""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a dictionary returned by the Azure filesystem.
Args:
file_dict: A dictionary returned by the Azure filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
makedirs(self, path)
Create a directory at the given path.
If needed also create missing parent directories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)
Create a directory at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path, exist_ok=True)
open(self, path, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path of the file to open. |
required |
mode |
str |
Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported. |
'r' |
Returns:
Type | Description |
---|---|
Any |
A file-like object. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
remove(self, path)
Remove the file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to remove. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path to remove.
"""
self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)
Remove the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to remove. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
stat(self, path)
Return stat info for the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to get stat info for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Stat info. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
Stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Unused argument to conform to interface. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Unused argument to conform to interface. |
None |
Yields:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
flavors
special
Azure integration flavors.
azure_artifact_store_flavor
Azure artifact store flavor.
AzureArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin)
pydantic-model
Configuration class for Azure Artifact Store.
Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreConfig(
BaseArtifactStoreConfig, AuthenticationConfigMixin
):
"""Configuration class for Azure Artifact Store."""
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"abfs://", "az://"}
AzureArtifactStoreFlavor (BaseArtifactStoreFlavor)
Azure Artifact Store flavor.
Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreFlavor(BaseArtifactStoreFlavor):
"""Azure Artifact Store flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AZURE_ARTIFACT_STORE_FLAVOR
@property
def config_class(self) -> Type[AzureArtifactStoreConfig]:
"""Returns AzureArtifactStoreConfig config class.
Returns:
The config class.
"""
return AzureArtifactStoreConfig
@property
def implementation_class(self) -> Type["AzureArtifactStore"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.azure.artifact_stores import AzureArtifactStore
return AzureArtifactStore
config_class: Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig]
property
readonly
Returns AzureArtifactStoreConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig] |
The config class. |
implementation_class: Type[AzureArtifactStore]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AzureArtifactStore] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
azure_secrets_manager_flavor
Azure secrets manager flavor.
AzureSecretsManagerConfig (BaseSecretsManagerConfig)
pydantic-model
Configuration for the Azure Secrets Manager.
Attributes:
Name | Type | Description |
---|---|---|
key_vault_name |
str |
Name of an Azure Key Vault that this secrets manager will use to store secrets. |
Source code in zenml/integrations/azure/flavors/azure_secrets_manager_flavor.py
class AzureSecretsManagerConfig(BaseSecretsManagerConfig):
"""Configuration for the Azure Secrets Manager.
Attributes:
key_vault_name: Name of an Azure Key Vault that this secrets manager
will use to store secrets.
"""
SUPPORTS_SCOPING: ClassVar[bool] = True
key_vault_name: str
@classmethod
def _validate_scope(
cls,
scope: SecretsManagerScope,
namespace: Optional[str],
) -> None:
"""Validate the scope and namespace value.
Args:
scope: Scope value.
namespace: Optional namespace value.
"""
if namespace:
validate_azure_secret_name_or_namespace(namespace, scope)
AzureSecretsManagerFlavor (BaseSecretsManagerFlavor)
Class for the AzureSecretsManagerFlavor
.
Source code in zenml/integrations/azure/flavors/azure_secrets_manager_flavor.py
class AzureSecretsManagerFlavor(BaseSecretsManagerFlavor):
"""Class for the `AzureSecretsManagerFlavor`."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AZURE_SECRETS_MANAGER_FLAVOR
@property
def config_class(self) -> Type[AzureSecretsManagerConfig]:
"""Returns AzureSecretsManagerConfig config class.
Returns:
The config class.
"""
return AzureSecretsManagerConfig
@property
def implementation_class(self) -> Type["AzureSecretsManager"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.azure.secrets_managers import (
AzureSecretsManager,
)
return AzureSecretsManager
config_class: Type[zenml.integrations.azure.flavors.azure_secrets_manager_flavor.AzureSecretsManagerConfig]
property
readonly
Returns AzureSecretsManagerConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.azure.flavors.azure_secrets_manager_flavor.AzureSecretsManagerConfig] |
The config class. |
implementation_class: Type[AzureSecretsManager]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AzureSecretsManager] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
validate_azure_secret_name_or_namespace(name, scope)
Validate a secret name or namespace.
Azure secret names must contain only alphanumeric characters and the
character -
.
Given that we also save secret names and namespaces as labels, we are also limited by the 256 maximum size limitation that Azure imposes on label values. An arbitrary length of 100 characters is used here for the maximum size for the secret name and namespace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the secret name or namespace |
required |
scope |
SecretsManagerScope |
the current scope |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the secret name or namespace is invalid |
Source code in zenml/integrations/azure/flavors/azure_secrets_manager_flavor.py
def validate_azure_secret_name_or_namespace(
name: str,
scope: SecretsManagerScope,
) -> None:
"""Validate a secret name or namespace.
Azure secret names must contain only alphanumeric characters and the
character `-`.
Given that we also save secret names and namespaces as labels, we are
also limited by the 256 maximum size limitation that Azure imposes on
label values. An arbitrary length of 100 characters is used here for
the maximum size for the secret name and namespace.
Args:
name: the secret name or namespace
scope: the current scope
Raises:
ValueError: if the secret name or namespace is invalid
"""
if scope == SecretsManagerScope.NONE:
# to preserve backwards compatibility, we don't validate the
# secret name for unscoped secrets.
return
if not re.fullmatch(r"[0-9a-zA-Z-]+", name):
raise ValueError(
f"Invalid secret name or namespace '{name}'. Must contain "
f"only alphanumeric characters and the character -."
)
if len(name) > 100:
raise ValueError(
f"Invalid secret name or namespace '{name}'. The length is "
f"limited to maximum 100 characters."
)
azureml_step_operator_flavor
AzureML step operator flavor.
AzureMLStepOperatorConfig (BaseStepOperatorConfig, AzureMLStepOperatorSettings)
pydantic-model
Config for the AzureML step operator.
Attributes:
Name | Type | Description |
---|---|---|
subscription_id |
str |
The Azure account's subscription ID |
resource_group |
str |
The resource group to which the AzureML workspace is deployed. |
workspace_name |
str |
The name of the AzureML Workspace. |
compute_target_name |
str |
The name of the configured ComputeTarget. An instance of it has to be created on the portal if it doesn't exist already. |
tenant_id |
Optional[str] |
The Azure Tenant ID. |
service_principal_id |
Optional[str] |
The ID for the service principal that is created to allow apps to access secure resources. |
service_principal_password |
Optional[str] |
Password for the service principal. |
Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseStepOperatorConfig, AzureMLStepOperatorSettings
):
"""Config for the AzureML step operator.
Attributes:
subscription_id: The Azure account's subscription ID
resource_group: The resource group to which the AzureML workspace
is deployed.
workspace_name: The name of the AzureML Workspace.
compute_target_name: The name of the configured ComputeTarget.
An instance of it has to be created on the portal if it doesn't
exist already.
tenant_id: The Azure Tenant ID.
service_principal_id: The ID for the service principal that is created
to allow apps to access secure resources.
service_principal_password: Password for the service principal.
"""
subscription_id: str
resource_group: str
workspace_name: str
compute_target_name: str
# Service principal authentication
# https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication#configure-a-service-principal
tenant_id: Optional[str] = SecretField()
service_principal_id: Optional[str] = SecretField()
service_principal_password: Optional[str] = SecretField()
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return True
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
AzureMLStepOperatorFlavor (BaseStepOperatorFlavor)
Flavor for the AzureML step operator.
Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorFlavor(BaseStepOperatorFlavor):
"""Flavor for the AzureML step operator."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AZUREML_STEP_OPERATOR_FLAVOR
@property
def config_class(self) -> Type[AzureMLStepOperatorConfig]:
"""Returns AzureMLStepOperatorConfig config class.
Returns:
The config class.
"""
return AzureMLStepOperatorConfig
@property
def implementation_class(self) -> Type["AzureMLStepOperator"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.azure.step_operators import AzureMLStepOperator
return AzureMLStepOperator
config_class: Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig]
property
readonly
Returns AzureMLStepOperatorConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig] |
The config class. |
implementation_class: Type[AzureMLStepOperator]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AzureMLStepOperator] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
AzureMLStepOperatorSettings (BaseSettings)
pydantic-model
Settings for the AzureML step operator.
Attributes:
Name | Type | Description |
---|---|---|
environment_name |
Optional[str] |
The name of the environment if there already exists one. |
Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorSettings(BaseSettings):
"""Settings for the AzureML step operator.
Attributes:
environment_name: The name of the environment if there
already exists one.
"""
environment_name: Optional[str] = None
secrets_managers
special
Initialization of the Azure Secrets Manager integration.
azure_secrets_manager
Implementation of the Azure Secrets Manager integration.
AzureSecretsManager (BaseSecretsManager)
Class to interact with the Azure secrets manager.
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
class AzureSecretsManager(BaseSecretsManager):
"""Class to interact with the Azure secrets manager."""
CLIENT: ClassVar[Any] = None
@property
def config(self) -> AzureSecretsManagerConfig:
"""Returns the `AzureSecretsManagerConfig` config.
Returns:
The configuration.
"""
return cast(AzureSecretsManagerConfig, self._config)
@classmethod
def _ensure_client_connected(cls, vault_name: str) -> None:
if cls.CLIENT is None:
KVUri = f"https://{vault_name}.vault.azure.net"
credential = DefaultAzureCredential()
cls.CLIENT = SecretClient(vault_url=KVUri, credential=credential)
def validate_secret_name(self, name: str) -> None:
"""Validate a secret name.
Args:
name: the secret name
"""
validate_azure_secret_name_or_namespace(name, self.config.scope)
def _create_or_update_secret(self, secret: BaseSecretSchema) -> None:
"""Creates a new secret or updated an existing one.
Args:
secret: the secret to register or update
"""
if self.config.scope == SecretsManagerScope.NONE:
# legacy, non-scoped secrets
for key, value in secret.content.items():
encoded_key = base64.b64encode(
f"{secret.name}-{key}".encode()
).hex()
azure_secret_name = f"zenml-{encoded_key}"
self.CLIENT.set_secret(azure_secret_name, value)
self.CLIENT.update_secret_properties(
azure_secret_name,
tags={
ZENML_GROUP_KEY: secret.name,
ZENML_KEY_NAME: key,
ZENML_SCHEMA_NAME: secret.TYPE,
},
)
logger.debug(
"Secret `%s` written to the Azure Key Vault.",
azure_secret_name,
)
else:
azure_secret_name = self._get_scoped_secret_name(
secret.name,
separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
)
self.CLIENT.set_secret(
azure_secret_name,
json.dumps(secret_to_dict(secret)),
)
self.CLIENT.update_secret_properties(
azure_secret_name,
tags=self._get_secret_metadata(secret),
)
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: the secret to register
Raises:
SecretExistsError: if the secret already exists
"""
self.validate_secret_name(secret.name)
self._ensure_client_connected(self.config.key_vault_name)
if secret.name in self.get_all_secret_keys():
raise SecretExistsError(
f"A Secret with the name '{secret.name}' already exists."
)
self._create_or_update_secret(secret)
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Get a secret by its name.
Args:
secret_name: the name of the secret to get
Returns:
The secret.
Raises:
KeyError: if the secret does not exist
ValueError: if the secret is named 'name'
"""
self.validate_secret_name(secret_name)
self._ensure_client_connected(self.config.key_vault_name)
zenml_secret: Optional[BaseSecretSchema] = None
if self.config.scope == SecretsManagerScope.NONE:
# Legacy secrets are mapped to multiple Azure secrets, one for
# each secret key
secret_contents = {}
zenml_schema_name = ""
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
secret_key = tags.get(ZENML_KEY_NAME)
if not secret_key:
raise ValueError("Missing secret key tag.")
if secret_key == "name":
raise ValueError("The secret's key cannot be 'name'.")
response = self.CLIENT.get_secret(secret_property.name)
secret_contents[secret_key] = response.value
zenml_schema_name = tags.get(ZENML_SCHEMA_NAME)
if secret_contents:
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
zenml_secret = secret_schema(**secret_contents)
else:
# Scoped secrets are mapped 1-to-1 with Azure secrets
try:
response = self.CLIENT.get_secret(
self._get_scoped_secret_name(
secret_name,
separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
),
)
scope_tags = self._get_secret_scope_metadata(secret_name)
# all scope tags need to be included in the Azure secret tags,
# otherwise the secret does not belong to the current scope,
# even if it has the same name
if scope_tags.items() <= response.properties.tags.items():
zenml_secret = secret_from_dict(
json.loads(response.value), secret_name=secret_name
)
except ResourceNotFoundError:
pass
if not zenml_secret:
raise KeyError(f"Can't find the specified secret '{secret_name}'")
return zenml_secret
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys
"""
self._ensure_client_connected(self.config.key_vault_name)
set_of_secrets = set()
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if not tags:
continue
if self.config.scope == SecretsManagerScope.NONE:
# legacy, non-scoped secrets
if ZENML_GROUP_KEY in tags:
set_of_secrets.add(tags.get(ZENML_GROUP_KEY))
continue
scope_tags = self._get_secret_scope_metadata()
# all scope tags need to be included in the Azure secret tags,
# otherwise the secret does not belong to the current scope
if scope_tags.items() <= tags.items():
set_of_secrets.add(tags.get(ZENML_SECRET_NAME_LABEL))
return list(set_of_secrets)
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret by creating new versions of the existing secrets.
Args:
secret: the secret to update
Raises:
KeyError: if the secret does not exist
"""
self.validate_secret_name(secret.name)
self._ensure_client_connected(self.config.key_vault_name)
if secret.name not in self.get_all_secret_keys():
raise KeyError(f"Can't find the specified secret '{secret.name}'")
self._create_or_update_secret(secret)
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret. by name.
Args:
secret_name: the name of the secret to delete
Raises:
KeyError: if the secret no longer exists
"""
self.validate_secret_name(secret_name)
self._ensure_client_connected(self.config.key_vault_name)
if self.config.scope == SecretsManagerScope.NONE:
# legacy, non-scoped secrets
# Go through all Azure secrets and delete the ones with the
# secret_name as label.
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
self.CLIENT.begin_delete_secret(
secret_property.name
).result()
else:
if secret_name not in self.get_all_secret_keys():
raise KeyError(
f"Can't find the specified secret '{secret_name}'"
)
self.CLIENT.begin_delete_secret(
self._get_scoped_secret_name(
secret_name,
separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
),
).result()
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
self._ensure_client_connected(self.config.key_vault_name)
# List all secrets.
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if not tags:
continue
if self.config.scope == SecretsManagerScope.NONE:
# legacy, non-scoped secrets
if ZENML_GROUP_KEY in tags:
logger.info(
"Deleted key-value pair {`%s`, `***`} from secret "
"`%s`",
secret_property.name,
tags.get(ZENML_GROUP_KEY),
)
self.CLIENT.begin_delete_secret(
secret_property.name
).result()
continue
scope_tags = self._get_secret_scope_metadata()
# all scope tags need to be included in the Azure secret tags,
# otherwise the secret does not belong to the current scope
if scope_tags.items() <= tags.items():
self.CLIENT.begin_delete_secret(secret_property.name).result()
config: AzureSecretsManagerConfig
property
readonly
Returns the AzureSecretsManagerConfig
config.
Returns:
Type | Description |
---|---|
AzureSecretsManagerConfig |
The configuration. |
delete_all_secrets(self)
Delete all existing secrets.
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
self._ensure_client_connected(self.config.key_vault_name)
# List all secrets.
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if not tags:
continue
if self.config.scope == SecretsManagerScope.NONE:
# legacy, non-scoped secrets
if ZENML_GROUP_KEY in tags:
logger.info(
"Deleted key-value pair {`%s`, `***`} from secret "
"`%s`",
secret_property.name,
tags.get(ZENML_GROUP_KEY),
)
self.CLIENT.begin_delete_secret(
secret_property.name
).result()
continue
scope_tags = self._get_secret_scope_metadata()
# all scope tags need to be included in the Azure secret tags,
# otherwise the secret does not belong to the current scope
if scope_tags.items() <= tags.items():
self.CLIENT.begin_delete_secret(secret_property.name).result()
delete_secret(self, secret_name)
Delete an existing secret. by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
the name of the secret to delete |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
if the secret no longer exists |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret. by name.
Args:
secret_name: the name of the secret to delete
Raises:
KeyError: if the secret no longer exists
"""
self.validate_secret_name(secret_name)
self._ensure_client_connected(self.config.key_vault_name)
if self.config.scope == SecretsManagerScope.NONE:
# legacy, non-scoped secrets
# Go through all Azure secrets and delete the ones with the
# secret_name as label.
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
self.CLIENT.begin_delete_secret(
secret_property.name
).result()
else:
if secret_name not in self.get_all_secret_keys():
raise KeyError(
f"Can't find the specified secret '{secret_name}'"
)
self.CLIENT.begin_delete_secret(
self._get_scoped_secret_name(
secret_name,
separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
),
).result()
get_all_secret_keys(self)
Get all secret keys.
Returns:
Type | Description |
---|---|
List[str] |
A list of all secret keys |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys
"""
self._ensure_client_connected(self.config.key_vault_name)
set_of_secrets = set()
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if not tags:
continue
if self.config.scope == SecretsManagerScope.NONE:
# legacy, non-scoped secrets
if ZENML_GROUP_KEY in tags:
set_of_secrets.add(tags.get(ZENML_GROUP_KEY))
continue
scope_tags = self._get_secret_scope_metadata()
# all scope tags need to be included in the Azure secret tags,
# otherwise the secret does not belong to the current scope
if scope_tags.items() <= tags.items():
set_of_secrets.add(tags.get(ZENML_SECRET_NAME_LABEL))
return list(set_of_secrets)
get_secret(self, secret_name)
Get a secret by its name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
the name of the secret to get |
required |
Returns:
Type | Description |
---|---|
BaseSecretSchema |
The secret. |
Exceptions:
Type | Description |
---|---|
KeyError |
if the secret does not exist |
ValueError |
if the secret is named 'name' |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Get a secret by its name.
Args:
secret_name: the name of the secret to get
Returns:
The secret.
Raises:
KeyError: if the secret does not exist
ValueError: if the secret is named 'name'
"""
self.validate_secret_name(secret_name)
self._ensure_client_connected(self.config.key_vault_name)
zenml_secret: Optional[BaseSecretSchema] = None
if self.config.scope == SecretsManagerScope.NONE:
# Legacy secrets are mapped to multiple Azure secrets, one for
# each secret key
secret_contents = {}
zenml_schema_name = ""
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
secret_key = tags.get(ZENML_KEY_NAME)
if not secret_key:
raise ValueError("Missing secret key tag.")
if secret_key == "name":
raise ValueError("The secret's key cannot be 'name'.")
response = self.CLIENT.get_secret(secret_property.name)
secret_contents[secret_key] = response.value
zenml_schema_name = tags.get(ZENML_SCHEMA_NAME)
if secret_contents:
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
zenml_secret = secret_schema(**secret_contents)
else:
# Scoped secrets are mapped 1-to-1 with Azure secrets
try:
response = self.CLIENT.get_secret(
self._get_scoped_secret_name(
secret_name,
separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
),
)
scope_tags = self._get_secret_scope_metadata(secret_name)
# all scope tags need to be included in the Azure secret tags,
# otherwise the secret does not belong to the current scope,
# even if it has the same name
if scope_tags.items() <= response.properties.tags.items():
zenml_secret = secret_from_dict(
json.loads(response.value), secret_name=secret_name
)
except ResourceNotFoundError:
pass
if not zenml_secret:
raise KeyError(f"Can't find the specified secret '{secret_name}'")
return zenml_secret
register_secret(self, secret)
Registers a new secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
the secret to register |
required |
Exceptions:
Type | Description |
---|---|
SecretExistsError |
if the secret already exists |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: the secret to register
Raises:
SecretExistsError: if the secret already exists
"""
self.validate_secret_name(secret.name)
self._ensure_client_connected(self.config.key_vault_name)
if secret.name in self.get_all_secret_keys():
raise SecretExistsError(
f"A Secret with the name '{secret.name}' already exists."
)
self._create_or_update_secret(secret)
update_secret(self, secret)
Update an existing secret by creating new versions of the existing secrets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
the secret to update |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
if the secret does not exist |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret by creating new versions of the existing secrets.
Args:
secret: the secret to update
Raises:
KeyError: if the secret does not exist
"""
self.validate_secret_name(secret.name)
self._ensure_client_connected(self.config.key_vault_name)
if secret.name not in self.get_all_secret_keys():
raise KeyError(f"Can't find the specified secret '{secret.name}'")
self._create_or_update_secret(secret)
validate_secret_name(self, name)
Validate a secret name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the secret name |
required |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def validate_secret_name(self, name: str) -> None:
"""Validate a secret name.
Args:
name: the secret name
"""
validate_azure_secret_name_or_namespace(name, self.config.scope)
step_operators
special
Initialization of AzureML Step Operator integration.
azureml_step_operator
Implementation of the ZenML AzureML Step Operator.
AzureMLStepOperator (BaseStepOperator)
Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
class AzureMLStepOperator(BaseStepOperator):
"""Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the
ZenML entrypoint command in it.
"""
@property
def config(self) -> AzureMLStepOperatorConfig:
"""Returns the `AzureMLStepOperatorConfig` config.
Returns:
The configuration.
"""
return cast(AzureMLStepOperatorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the AzureML step operator.
Returns:
The settings class.
"""
return AzureMLStepOperatorSettings
@property
def validator(self) -> Optional[StackValidator]:
"""Validates the stack.
Returns:
A validator that checks that the stack contains a remote artifact
store.
"""
def _validate_remote_artifact_store(stack: "Stack") -> Tuple[bool, str]:
if stack.artifact_store.config.is_local:
return False, (
"The AzureML step operator runs code remotely and "
"needs to write files into the artifact store, but the "
f"artifact store `{stack.artifact_store.name}` of the "
"active stack is local. Please ensure that your stack "
"contains a remote artifact store when using the AzureML "
"step operator."
)
return True, ""
return StackValidator(
custom_validation_function=_validate_remote_artifact_store,
)
def _get_authentication(self) -> Optional[AbstractAuthentication]:
"""Returns the authentication object for the AzureML environment.
Returns:
The authentication object for the AzureML environment.
"""
if (
self.config.tenant_id
and self.config.service_principal_id
and self.config.service_principal_password
):
return ServicePrincipalAuthentication(
tenant_id=self.config.tenant_id,
service_principal_id=self.config.service_principal_id,
service_principal_password=self.config.service_principal_password,
)
return None
def prepare_pipeline_deployment(
self, deployment: "PipelineDeployment", stack: "Stack"
) -> None:
"""Store the active deployment in an environment variable.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
steps_to_run = [
step
for step in deployment.steps.values()
if step.config.step_operator == self.name
]
if not steps_to_run:
return
os.environ[ENV_ACTIVE_DEPLOYMENT] = deployment.yaml()
def _prepare_environment(
self,
workspace: Workspace,
docker_settings: "DockerSettings",
run_name: str,
environment_name: Optional[str] = None,
) -> Environment:
"""Prepares the environment in which Azure will run all jobs.
Args:
workspace: The AzureML Workspace that has configuration
for a storage account, container registry among other
things.
docker_settings: The Docker settings for this step.
run_name: The name of the pipeline run that can be used
for naming environments and runs.
environment_name: Optional name of an existing environment to use.
Returns:
The AzureML Environment object.
"""
docker_image_builder = PipelineDockerImageBuilder()
requirements_files = docker_image_builder._gather_requirements_files(
docker_settings=docker_settings,
stack=Client().active_stack,
)
requirements = list(
itertools.chain.from_iterable(
r[1].split("\n") for r in requirements_files
)
)
requirements.append(f"zenml=={zenml.__version__}")
logger.info(
"Using requirements for AzureML step operator environment: %s",
requirements,
)
if environment_name:
environment = Environment.get(
workspace=workspace, name=environment_name
)
if not environment.python.conda_dependencies:
environment.python.conda_dependencies = (
CondaDependencies.create(
python_version=ZenMLEnvironment.python_version()
)
)
for requirement in requirements:
environment.python.conda_dependencies.add_pip_package(
requirement
)
else:
environment = Environment(name=f"zenml-{run_name}")
environment.python.conda_dependencies = CondaDependencies.create(
pip_packages=requirements,
python_version=ZenMLEnvironment.python_version(),
)
if docker_settings.parent_image:
# replace the default azure base image
environment.docker.base_image = docker_settings.parent_image
environment_variables = {
"ENV_ZENML_PREVENT_PIPELINE_EXECUTION": "True",
}
# set credentials to access azure storage
for key in [
"AZURE_STORAGE_ACCOUNT_KEY",
"AZURE_STORAGE_ACCOUNT_NAME",
"AZURE_STORAGE_CONNECTION_STRING",
"AZURE_STORAGE_SAS_TOKEN",
]:
value = os.getenv(key)
if value:
environment_variables[key] = value
environment_variables[
ENV_ZENML_CONFIG_PATH
] = f"./{DOCKER_IMAGE_ZENML_CONFIG_DIR}"
environment_variables.update(docker_settings.environment)
environment.environment_variables = environment_variables
return environment
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
) -> None:
"""Launches a step on AzureML.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
Raises:
RuntimeError: If the deployment config can't be found.
"""
if not info.config.resource_settings.empty:
logger.warning(
"Specifying custom step resources is not supported for "
"the AzureML step operator. If you want to run this step "
"operator on specific resources, you can do so by creating an "
"Azure compute target (https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target) "
"with a specific machine type and then updating this step "
"operator: `zenml step-operator update %s "
"--compute_target_name=<COMPUTE_TARGET_NAME>`",
self.name,
)
unused_docker_fields = [
"dockerfile",
"build_context_root",
"build_options",
"docker_target_repository",
"dockerignore",
"copy_files",
"copy_global_config",
"apt_packages",
]
docker_settings = info.pipeline.docker_settings
ignored_docker_fields = docker_settings.__fields_set__.intersection(
unused_docker_fields
)
if ignored_docker_fields:
logger.warning(
"The AzureML step operator currently does not support all "
"options defined in your Docker settings. Ignoring all "
"values set for the attributes: %s",
ignored_docker_fields,
)
settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
workspace = Workspace.get(
subscription_id=self.config.subscription_id,
resource_group=self.config.resource_group,
name=self.config.workspace_name,
auth=self._get_authentication(),
)
source_directory = get_source_root_path()
deployment = os.environ.get(ENV_ACTIVE_DEPLOYMENT)
deployment_path = os.path.join(
source_directory, DOCKER_IMAGE_DEPLOYMENT_CONFIG_FILE
)
if deployment:
with open(deployment_path, "w") as f:
f.write(deployment)
elif not os.path.exists(deployment_path):
# We're running in a non-local environment which should already
# include the deployment at the source root
raise RuntimeError("Unable to find deployment configuration.")
with _include_global_config(
build_context_root=source_directory,
load_config_path=PurePosixPath(
f"./{DOCKER_IMAGE_ZENML_CONFIG_DIR}"
),
):
environment = self._prepare_environment(
workspace=workspace,
docker_settings=docker_settings,
run_name=info.run_name,
environment_name=settings.environment_name,
)
compute_target = ComputeTarget(
workspace=workspace, name=self.config.compute_target_name
)
try:
run_config = ScriptRunConfig(
source_directory=source_directory,
environment=environment,
compute_target=compute_target,
command=entrypoint_command,
)
experiment = Experiment(
workspace=workspace, name=info.pipeline.name
)
run = experiment.submit(config=run_config)
finally:
if deployment:
os.remove(deployment_path)
run.display_name = info.run_name
run.wait_for_completion(show_output=True)
config: AzureMLStepOperatorConfig
property
readonly
Returns the AzureMLStepOperatorConfig
config.
Returns:
Type | Description |
---|---|
AzureMLStepOperatorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the AzureML step operator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates the stack.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A validator that checks that the stack contains a remote artifact store. |
launch(self, info, entrypoint_command)
Launches a step on AzureML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Information about the step run. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the deployment config can't be found. |
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def launch(
self,
info: "StepRunInfo",
entrypoint_command: List[str],
) -> None:
"""Launches a step on AzureML.
Args:
info: Information about the step run.
entrypoint_command: Command that executes the step.
Raises:
RuntimeError: If the deployment config can't be found.
"""
if not info.config.resource_settings.empty:
logger.warning(
"Specifying custom step resources is not supported for "
"the AzureML step operator. If you want to run this step "
"operator on specific resources, you can do so by creating an "
"Azure compute target (https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target) "
"with a specific machine type and then updating this step "
"operator: `zenml step-operator update %s "
"--compute_target_name=<COMPUTE_TARGET_NAME>`",
self.name,
)
unused_docker_fields = [
"dockerfile",
"build_context_root",
"build_options",
"docker_target_repository",
"dockerignore",
"copy_files",
"copy_global_config",
"apt_packages",
]
docker_settings = info.pipeline.docker_settings
ignored_docker_fields = docker_settings.__fields_set__.intersection(
unused_docker_fields
)
if ignored_docker_fields:
logger.warning(
"The AzureML step operator currently does not support all "
"options defined in your Docker settings. Ignoring all "
"values set for the attributes: %s",
ignored_docker_fields,
)
settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))
workspace = Workspace.get(
subscription_id=self.config.subscription_id,
resource_group=self.config.resource_group,
name=self.config.workspace_name,
auth=self._get_authentication(),
)
source_directory = get_source_root_path()
deployment = os.environ.get(ENV_ACTIVE_DEPLOYMENT)
deployment_path = os.path.join(
source_directory, DOCKER_IMAGE_DEPLOYMENT_CONFIG_FILE
)
if deployment:
with open(deployment_path, "w") as f:
f.write(deployment)
elif not os.path.exists(deployment_path):
# We're running in a non-local environment which should already
# include the deployment at the source root
raise RuntimeError("Unable to find deployment configuration.")
with _include_global_config(
build_context_root=source_directory,
load_config_path=PurePosixPath(
f"./{DOCKER_IMAGE_ZENML_CONFIG_DIR}"
),
):
environment = self._prepare_environment(
workspace=workspace,
docker_settings=docker_settings,
run_name=info.run_name,
environment_name=settings.environment_name,
)
compute_target = ComputeTarget(
workspace=workspace, name=self.config.compute_target_name
)
try:
run_config = ScriptRunConfig(
source_directory=source_directory,
environment=environment,
compute_target=compute_target,
command=entrypoint_command,
)
experiment = Experiment(
workspace=workspace, name=info.pipeline.name
)
run = experiment.submit(config=run_config)
finally:
if deployment:
os.remove(deployment_path)
run.display_name = info.run_name
run.wait_for_completion(show_output=True)
prepare_pipeline_deployment(self, deployment, stack)
Store the active deployment in an environment variable.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def prepare_pipeline_deployment(
self, deployment: "PipelineDeployment", stack: "Stack"
) -> None:
"""Store the active deployment in an environment variable.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
steps_to_run = [
step
for step in deployment.steps.values()
if step.config.step_operator == self.name
]
if not steps_to_run:
return
os.environ[ENV_ACTIVE_DEPLOYMENT] = deployment.yaml()