Skip to content

Azure

zenml.integrations.azure special

Initialization of the ZenML Azure integration.

The Azure integration submodule provides a way to run ZenML pipelines in a cloud environment. Specifically, it allows the use of cloud artifact stores, and an io module to handle file operations on Azure Blob Storage. The Azure Step Operator integration submodule provides a way to run ZenML steps in AzureML.

AzureIntegration (Integration)

Definition of Azure integration for ZenML.

Source code in zenml/integrations/azure/__init__.py
class AzureIntegration(Integration):
    """Definition of Azure integration for ZenML."""

    NAME = AZURE
    REQUIREMENTS = [
        "adlfs==2021.10.0",
        "azure-keyvault-keys",
        "azure-keyvault-secrets",
        "azure-identity==1.10.0",
        "azureml-core==1.48.0",
    ]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declares the flavors for the integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.azure.flavors import (
            AzureArtifactStoreFlavor,
            AzureMLStepOperatorFlavor,
            AzureSecretsManagerFlavor,
        )

        return [
            AzureArtifactStoreFlavor,
            AzureSecretsManagerFlavor,
            AzureMLStepOperatorFlavor,
        ]

flavors() classmethod

Declares the flavors for the integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/azure/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declares the flavors for the integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.azure.flavors import (
        AzureArtifactStoreFlavor,
        AzureMLStepOperatorFlavor,
        AzureSecretsManagerFlavor,
    )

    return [
        AzureArtifactStoreFlavor,
        AzureSecretsManagerFlavor,
        AzureMLStepOperatorFlavor,
    ]

artifact_stores special

Initialization of the Azure Artifact Store integration.

azure_artifact_store

Implementation of the Azure Artifact Store integration.

AzureArtifactStore (BaseArtifactStore, AuthenticationMixin)

Artifact Store for Microsoft Azure based artifacts.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
class AzureArtifactStore(BaseArtifactStore, AuthenticationMixin):
    """Artifact Store for Microsoft Azure based artifacts."""

    _filesystem: Optional[adlfs.AzureBlobFileSystem] = None

    @property
    def config(self) -> AzureArtifactStoreConfig:
        """Returns the `AzureArtifactStoreConfig` config.

        Returns:
            The configuration.
        """
        return cast(AzureArtifactStoreConfig, self._config)

    @property
    def filesystem(self) -> adlfs.AzureBlobFileSystem:
        """The adlfs filesystem to access this artifact store.

        Returns:
            The adlfs filesystem to access this artifact store.
        """
        if not self._filesystem:
            secret = self.get_authentication_secret(
                expected_schema_type=AzureSecretSchema
            )
            credentials = secret.content if secret else {}

            self._filesystem = adlfs.AzureBlobFileSystem(
                **credentials,
                anon=False,
                use_listings_cache=False,
            )
        return self._filesystem

    def _split_path(self, path: PathType) -> Tuple[str, str]:
        """Splits a path into the filesystem prefix and remainder.

        Example:
        ```python
        prefix, remainder = ZenAzure._split_path("az://my_container/test.txt")
        print(prefix, remainder)  # "az://" "my_container/test.txt"
        ```

        Args:
            path: The path to split.

        Returns:
            A tuple of the filesystem prefix and the remainder.
        """
        path = convert_to_str(path)
        prefix = ""
        for potential_prefix in self.config.SUPPORTED_SCHEMES:
            if path.startswith(potential_prefix):
                prefix = potential_prefix
                path = path[len(potential_prefix) :]
                break

        return prefix, path

    def open(self, path: PathType, mode: str = "r") -> Any:
        """Open a file at the given path.

        Args:
            path: Path of the file to open.
            mode: Mode in which to open the file. Currently, only
                'rb' and 'wb' to read and write binary files are supported.

        Returns:
            A file-like object.
        """
        return self.filesystem.open(path=path, mode=mode)

    def copyfile(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file.

        Args:
            src: The path to copy from.
            dst: The path to copy to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to copy to destination '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to copy anyway."
            )

        # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
        #  manually remove it first
        self.filesystem.copy(path1=src, path2=dst)

    def exists(self, path: PathType) -> bool:
        """Check whether a path exists.

        Args:
            path: The path to check.

        Returns:
            True if the path exists, False otherwise.
        """
        return self.filesystem.exists(path=path)  # type: ignore[no-any-return]

    def glob(self, pattern: PathType) -> List[PathType]:
        """Return all paths that match the given glob pattern.

        The glob pattern may include:
        - '*' to match any number of characters
        - '?' to match a single character
        - '[...]' to match one of the characters inside the brackets
        - '**' as the full name of a path component to match to search
            in subdirectories of any depth (e.g. '/some_dir/**/some_file)

        Args:
            pattern: The glob pattern to match, see details above.

        Returns:
            A list of paths that match the given glob pattern.
        """
        prefix, _ = self._split_path(pattern)
        return [
            f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
        ]

    def isdir(self, path: PathType) -> bool:
        """Check whether a path is a directory.

        Args:
            path: The path to check.

        Returns:
            True if the path is a directory, False otherwise.
        """
        return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]

    def listdir(self, path: PathType) -> List[PathType]:
        """Return a list of files in a directory.

        Args:
            path: The path to list.

        Returns:
            A list of files in the given directory.
        """
        _, path = self._split_path(path)

        def _extract_basename(file_dict: Dict[str, Any]) -> str:
            """Extracts the basename from a dictionary returned by the Azure filesystem.

            Args:
                file_dict: A dictionary returned by the Azure filesystem.

            Returns:
                The basename of the file.
            """
            file_path = cast(str, file_dict["name"])
            base_name = file_path[len(path) :]
            return base_name.lstrip("/")

        return [
            _extract_basename(dict_)
            for dict_ in self.filesystem.listdir(path=path)
        ]

    def makedirs(self, path: PathType) -> None:
        """Create a directory at the given path.

        If needed also create missing parent directories.

        Args:
            path: The path to create.
        """
        self.filesystem.makedirs(path=path, exist_ok=True)

    def mkdir(self, path: PathType) -> None:
        """Create a directory at the given path.

        Args:
            path: The path to create.
        """
        self.filesystem.makedir(path=path, exist_ok=True)

    def remove(self, path: PathType) -> None:
        """Remove the file at the given path.

        Args:
            path: The path to remove.
        """
        self.filesystem.rm_file(path=path)

    def rename(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Rename source file to destination file.

        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to rename file to '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to rename anyway."
            )

        # TODO [ENG-152]: Check if it works with overwrite=True or if we need
        #  to manually remove it first
        self.filesystem.rename(path1=src, path2=dst)

    def rmtree(self, path: PathType) -> None:
        """Remove the given directory.

        Args:
            path: The path of the directory to remove.
        """
        self.filesystem.delete(path=path, recursive=True)

    def stat(self, path: PathType) -> Dict[str, Any]:
        """Return stat info for the given path.

        Args:
            path: The path to get stat info for.

        Returns:
            Stat info.
        """
        return self.filesystem.stat(path=path)  # type: ignore[no-any-return]

    def walk(
        self,
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.

        Args:
            top: Path of directory to walk.
            topdown: Unused argument to conform to interface.
            onerror: Unused argument to conform to interface.

        Yields:
            An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
        """
        # TODO [ENG-153]: Additional params
        prefix, _ = self._split_path(top)
        for (
            directory,
            subdirectories,
            files,
        ) in self.filesystem.walk(path=top):
            yield f"{prefix}{directory}", subdirectories, files
config: AzureArtifactStoreConfig property readonly

Returns the AzureArtifactStoreConfig config.

Returns:

Type Description
AzureArtifactStoreConfig

The configuration.

filesystem: AzureBlobFileSystem property readonly

The adlfs filesystem to access this artifact store.

Returns:

Type Description
AzureBlobFileSystem

The adlfs filesystem to access this artifact store.

copyfile(self, src, dst, overwrite=False)

Copy a file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path to copy from.

required
dst Union[bytes, str]

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )

    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(self, path)

Check whether a path exists.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
glob(self, pattern)

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern Union[bytes, str]

The glob pattern to match, see details above.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths that match the given glob pattern.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
        in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    prefix, _ = self._split_path(pattern)
    return [
        f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
    ]
isdir(self, path)

Check whether a path is a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(self, path)

Return a list of files in a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to list.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of files in the given directory.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path to list.

    Returns:
        A list of files in the given directory.
    """
    _, path = self._split_path(path)

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a dictionary returned by the Azure filesystem.

        Args:
            file_dict: A dictionary returned by the Azure filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
    ]
makedirs(self, path)

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)

Create a directory at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path to create.
    """
    self.filesystem.makedir(path=path, exist_ok=True)
open(self, path, mode='r')

Open a file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(self, path)

Remove the file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to remove.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path to remove.
    """
    self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)

Remove the given directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to remove.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
stat(self, path)

Return stat info for the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

Stat info.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: The path to get stat info for.

    Returns:
        Stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., NoneType]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    prefix, _ = self._split_path(top)
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{prefix}{directory}", subdirectories, files

flavors special

Azure integration flavors.

azure_artifact_store_flavor

Azure artifact store flavor.

AzureArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin) pydantic-model

Configuration class for Azure Artifact Store.

Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreConfig(
    BaseArtifactStoreConfig, AuthenticationConfigMixin
):
    """Configuration class for Azure Artifact Store."""

    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"abfs://", "az://"}
AzureArtifactStoreFlavor (BaseArtifactStoreFlavor)

Azure Artifact Store flavor.

Source code in zenml/integrations/azure/flavors/azure_artifact_store_flavor.py
class AzureArtifactStoreFlavor(BaseArtifactStoreFlavor):
    """Azure Artifact Store flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AZURE_ARTIFACT_STORE_FLAVOR

    @property
    def config_class(self) -> Type[AzureArtifactStoreConfig]:
        """Returns AzureArtifactStoreConfig config class.

        Returns:
            The config class.
        """
        return AzureArtifactStoreConfig

    @property
    def implementation_class(self) -> Type["AzureArtifactStore"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.azure.artifact_stores import AzureArtifactStore

        return AzureArtifactStore
config_class: Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig] property readonly

Returns AzureArtifactStoreConfig config class.

Returns:

Type Description
Type[zenml.integrations.azure.flavors.azure_artifact_store_flavor.AzureArtifactStoreConfig]

The config class.

implementation_class: Type[AzureArtifactStore] property readonly

Implementation class.

Returns:

Type Description
Type[AzureArtifactStore]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

azure_secrets_manager_flavor

Azure secrets manager flavor.

AzureSecretsManagerConfig (BaseSecretsManagerConfig) pydantic-model

Configuration for the Azure Secrets Manager.

Attributes:

Name Type Description
key_vault_name str

Name of an Azure Key Vault that this secrets manager will use to store secrets.

Source code in zenml/integrations/azure/flavors/azure_secrets_manager_flavor.py
class AzureSecretsManagerConfig(BaseSecretsManagerConfig):
    """Configuration for the Azure Secrets Manager.

    Attributes:
        key_vault_name: Name of an Azure Key Vault that this secrets manager
            will use to store secrets.
    """

    SUPPORTS_SCOPING: ClassVar[bool] = True

    key_vault_name: str

    @classmethod
    def _validate_scope(
        cls,
        scope: SecretsManagerScope,
        namespace: Optional[str],
    ) -> None:
        """Validate the scope and namespace value.

        Args:
            scope: Scope value.
            namespace: Optional namespace value.
        """
        if namespace:
            validate_azure_secret_name_or_namespace(namespace, scope)
AzureSecretsManagerFlavor (BaseSecretsManagerFlavor)

Class for the AzureSecretsManagerFlavor.

Source code in zenml/integrations/azure/flavors/azure_secrets_manager_flavor.py
class AzureSecretsManagerFlavor(BaseSecretsManagerFlavor):
    """Class for the `AzureSecretsManagerFlavor`."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AZURE_SECRETS_MANAGER_FLAVOR

    @property
    def config_class(self) -> Type[AzureSecretsManagerConfig]:
        """Returns AzureSecretsManagerConfig config class.

        Returns:
            The config class.
        """
        return AzureSecretsManagerConfig

    @property
    def implementation_class(self) -> Type["AzureSecretsManager"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.azure.secrets_managers import (
            AzureSecretsManager,
        )

        return AzureSecretsManager
config_class: Type[zenml.integrations.azure.flavors.azure_secrets_manager_flavor.AzureSecretsManagerConfig] property readonly

Returns AzureSecretsManagerConfig config class.

Returns:

Type Description
Type[zenml.integrations.azure.flavors.azure_secrets_manager_flavor.AzureSecretsManagerConfig]

The config class.

implementation_class: Type[AzureSecretsManager] property readonly

Implementation class.

Returns:

Type Description
Type[AzureSecretsManager]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

validate_azure_secret_name_or_namespace(name, scope)

Validate a secret name or namespace.

Azure secret names must contain only alphanumeric characters and the character -.

Given that we also save secret names and namespaces as labels, we are also limited by the 256 maximum size limitation that Azure imposes on label values. An arbitrary length of 100 characters is used here for the maximum size for the secret name and namespace.

Parameters:

Name Type Description Default
name str

the secret name or namespace

required
scope SecretsManagerScope

the current scope

required

Exceptions:

Type Description
ValueError

if the secret name or namespace is invalid

Source code in zenml/integrations/azure/flavors/azure_secrets_manager_flavor.py
def validate_azure_secret_name_or_namespace(
    name: str,
    scope: SecretsManagerScope,
) -> None:
    """Validate a secret name or namespace.

    Azure secret names must contain only alphanumeric characters and the
    character `-`.

    Given that we also save secret names and namespaces as labels, we are
    also limited by the 256 maximum size limitation that Azure imposes on
    label values. An arbitrary length of 100 characters is used here for
    the maximum size for the secret name and namespace.

    Args:
        name: the secret name or namespace
        scope: the current scope

    Raises:
        ValueError: if the secret name or namespace is invalid
    """
    if scope == SecretsManagerScope.NONE:
        # to preserve backwards compatibility, we don't validate the
        # secret name for unscoped secrets.
        return

    if not re.fullmatch(r"[0-9a-zA-Z-]+", name):
        raise ValueError(
            f"Invalid secret name or namespace '{name}'. Must contain "
            f"only alphanumeric characters and the character -."
        )

    if len(name) > 100:
        raise ValueError(
            f"Invalid secret name or namespace '{name}'. The length is "
            f"limited to maximum 100 characters."
        )

azureml_step_operator_flavor

AzureML step operator flavor.

AzureMLStepOperatorConfig (BaseStepOperatorConfig, AzureMLStepOperatorSettings) pydantic-model

Config for the AzureML step operator.

Attributes:

Name Type Description
subscription_id str

The Azure account's subscription ID

resource_group str

The resource group to which the AzureML workspace is deployed.

workspace_name str

The name of the AzureML Workspace.

compute_target_name str

The name of the configured ComputeTarget. An instance of it has to be created on the portal if it doesn't exist already.

tenant_id Optional[str]

The Azure Tenant ID.

service_principal_id Optional[str]

The ID for the service principal that is created to allow apps to access secure resources.

service_principal_password Optional[str]

Password for the service principal.

Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseStepOperatorConfig, AzureMLStepOperatorSettings
):
    """Config for the AzureML step operator.

    Attributes:
        subscription_id: The Azure account's subscription ID
        resource_group: The resource group to which the AzureML workspace
            is deployed.
        workspace_name: The name of the AzureML Workspace.
        compute_target_name: The name of the configured ComputeTarget.
            An instance of it has to be created on the portal if it doesn't
            exist already.
        tenant_id: The Azure Tenant ID.
        service_principal_id: The ID for the service principal that is created
            to allow apps to access secure resources.
        service_principal_password: Password for the service principal.
    """

    subscription_id: str
    resource_group: str
    workspace_name: str
    compute_target_name: str

    # Service principal authentication
    # https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication#configure-a-service-principal
    tenant_id: Optional[str] = SecretField()
    service_principal_id: Optional[str] = SecretField()
    service_principal_password: Optional[str] = SecretField()

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return True
is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

AzureMLStepOperatorFlavor (BaseStepOperatorFlavor)

Flavor for the AzureML step operator.

Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorFlavor(BaseStepOperatorFlavor):
    """Flavor for the AzureML step operator."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AZUREML_STEP_OPERATOR_FLAVOR

    @property
    def config_class(self) -> Type[AzureMLStepOperatorConfig]:
        """Returns AzureMLStepOperatorConfig config class.

        Returns:
                The config class.
        """
        return AzureMLStepOperatorConfig

    @property
    def implementation_class(self) -> Type["AzureMLStepOperator"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.azure.step_operators import AzureMLStepOperator

        return AzureMLStepOperator
config_class: Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig] property readonly

Returns AzureMLStepOperatorConfig config class.

Returns:

Type Description
Type[zenml.integrations.azure.flavors.azureml_step_operator_flavor.AzureMLStepOperatorConfig]

The config class.

implementation_class: Type[AzureMLStepOperator] property readonly

Implementation class.

Returns:

Type Description
Type[AzureMLStepOperator]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

AzureMLStepOperatorSettings (BaseSettings) pydantic-model

Settings for the AzureML step operator.

Attributes:

Name Type Description
environment_name Optional[str]

The name of the environment if there already exists one.

Source code in zenml/integrations/azure/flavors/azureml_step_operator_flavor.py
class AzureMLStepOperatorSettings(BaseSettings):
    """Settings for the AzureML step operator.

    Attributes:
        environment_name: The name of the environment if there
            already exists one.
    """

    environment_name: Optional[str] = None

secrets_managers special

Initialization of the Azure Secrets Manager integration.

azure_secrets_manager

Implementation of the Azure Secrets Manager integration.

AzureSecretsManager (BaseSecretsManager)

Class to interact with the Azure secrets manager.

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
class AzureSecretsManager(BaseSecretsManager):
    """Class to interact with the Azure secrets manager."""

    CLIENT: ClassVar[Any] = None

    @property
    def config(self) -> AzureSecretsManagerConfig:
        """Returns the `AzureSecretsManagerConfig` config.

        Returns:
            The configuration.
        """
        return cast(AzureSecretsManagerConfig, self._config)

    @classmethod
    def _ensure_client_connected(cls, vault_name: str) -> None:
        if cls.CLIENT is None:
            KVUri = f"https://{vault_name}.vault.azure.net"

            credential = DefaultAzureCredential()
            cls.CLIENT = SecretClient(vault_url=KVUri, credential=credential)

    def validate_secret_name(self, name: str) -> None:
        """Validate a secret name.

        Args:
            name: the secret name
        """
        validate_azure_secret_name_or_namespace(name, self.config.scope)

    def _create_or_update_secret(self, secret: BaseSecretSchema) -> None:
        """Creates a new secret or updated an existing one.

        Args:
            secret: the secret to register or update
        """
        if self.config.scope == SecretsManagerScope.NONE:
            # legacy, non-scoped secrets

            for key, value in secret.content.items():
                encoded_key = base64.b64encode(
                    f"{secret.name}-{key}".encode()
                ).hex()
                azure_secret_name = f"zenml-{encoded_key}"

                self.CLIENT.set_secret(azure_secret_name, value)
                self.CLIENT.update_secret_properties(
                    azure_secret_name,
                    tags={
                        ZENML_GROUP_KEY: secret.name,
                        ZENML_KEY_NAME: key,
                        ZENML_SCHEMA_NAME: secret.TYPE,
                    },
                )

                logger.debug(
                    "Secret `%s` written to the Azure Key Vault.",
                    azure_secret_name,
                )
        else:
            azure_secret_name = self._get_scoped_secret_name(
                secret.name,
                separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
            )
            self.CLIENT.set_secret(
                azure_secret_name,
                json.dumps(secret_to_dict(secret)),
            )
            self.CLIENT.update_secret_properties(
                azure_secret_name,
                tags=self._get_secret_metadata(secret),
            )

    def register_secret(self, secret: BaseSecretSchema) -> None:
        """Registers a new secret.

        Args:
            secret: the secret to register

        Raises:
            SecretExistsError: if the secret already exists
        """
        self.validate_secret_name(secret.name)
        self._ensure_client_connected(self.config.key_vault_name)

        if secret.name in self.get_all_secret_keys():
            raise SecretExistsError(
                f"A Secret with the name '{secret.name}' already exists."
            )

        self._create_or_update_secret(secret)

    def get_secret(self, secret_name: str) -> BaseSecretSchema:
        """Get a secret by its name.

        Args:
            secret_name: the name of the secret to get

        Returns:
            The secret.

        Raises:
            KeyError: if the secret does not exist
            ValueError: if the secret is named 'name'
        """
        self.validate_secret_name(secret_name)
        self._ensure_client_connected(self.config.key_vault_name)
        zenml_secret: Optional[BaseSecretSchema] = None

        if self.config.scope == SecretsManagerScope.NONE:
            # Legacy secrets are mapped to multiple Azure secrets, one for
            # each secret key

            secret_contents = {}
            zenml_schema_name = ""

            for secret_property in self.CLIENT.list_properties_of_secrets():
                tags = secret_property.tags

                if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
                    secret_key = tags.get(ZENML_KEY_NAME)
                    if not secret_key:
                        raise ValueError("Missing secret key tag.")

                    if secret_key == "name":
                        raise ValueError("The secret's key cannot be 'name'.")

                    response = self.CLIENT.get_secret(secret_property.name)
                    secret_contents[secret_key] = response.value

                    zenml_schema_name = tags.get(ZENML_SCHEMA_NAME)

            if secret_contents:
                secret_contents["name"] = secret_name

                secret_schema = SecretSchemaClassRegistry.get_class(
                    secret_schema=zenml_schema_name
                )
                zenml_secret = secret_schema(**secret_contents)
        else:
            # Scoped secrets are mapped 1-to-1 with Azure secrets

            try:
                response = self.CLIENT.get_secret(
                    self._get_scoped_secret_name(
                        secret_name,
                        separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
                    ),
                )

                scope_tags = self._get_secret_scope_metadata(secret_name)

                # all scope tags need to be included in the Azure secret tags,
                # otherwise the secret does not belong to the current scope,
                # even if it has the same name
                if scope_tags.items() <= response.properties.tags.items():
                    zenml_secret = secret_from_dict(
                        json.loads(response.value), secret_name=secret_name
                    )
            except ResourceNotFoundError:
                pass

        if not zenml_secret:
            raise KeyError(f"Can't find the specified secret '{secret_name}'")

        return zenml_secret

    def get_all_secret_keys(self) -> List[str]:
        """Get all secret keys.

        Returns:
            A list of all secret keys
        """
        self._ensure_client_connected(self.config.key_vault_name)

        set_of_secrets = set()

        for secret_property in self.CLIENT.list_properties_of_secrets():
            tags = secret_property.tags
            if not tags:
                continue

            if self.config.scope == SecretsManagerScope.NONE:
                # legacy, non-scoped secrets
                if ZENML_GROUP_KEY in tags:
                    set_of_secrets.add(tags.get(ZENML_GROUP_KEY))
                continue

            scope_tags = self._get_secret_scope_metadata()
            # all scope tags need to be included in the Azure secret tags,
            # otherwise the secret does not belong to the current scope
            if scope_tags.items() <= tags.items():
                set_of_secrets.add(tags.get(ZENML_SECRET_NAME_LABEL))

        return list(set_of_secrets)

    def update_secret(self, secret: BaseSecretSchema) -> None:
        """Update an existing secret by creating new versions of the existing secrets.

        Args:
            secret: the secret to update

        Raises:
            KeyError: if the secret does not exist
        """
        self.validate_secret_name(secret.name)
        self._ensure_client_connected(self.config.key_vault_name)

        if secret.name not in self.get_all_secret_keys():
            raise KeyError(f"Can't find the specified secret '{secret.name}'")

        self._create_or_update_secret(secret)

    def delete_secret(self, secret_name: str) -> None:
        """Delete an existing secret. by name.

        Args:
            secret_name: the name of the secret to delete

        Raises:
            KeyError: if the secret no longer exists
        """
        self.validate_secret_name(secret_name)
        self._ensure_client_connected(self.config.key_vault_name)

        if self.config.scope == SecretsManagerScope.NONE:
            # legacy, non-scoped secrets

            # Go through all Azure secrets and delete the ones with the
            # secret_name as label.
            for secret_property in self.CLIENT.list_properties_of_secrets():
                tags = secret_property.tags
                if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
                    self.CLIENT.begin_delete_secret(
                        secret_property.name
                    ).result()

        else:
            if secret_name not in self.get_all_secret_keys():
                raise KeyError(
                    f"Can't find the specified secret '{secret_name}'"
                )
            self.CLIENT.begin_delete_secret(
                self._get_scoped_secret_name(
                    secret_name,
                    separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
                ),
            ).result()

    def delete_all_secrets(self) -> None:
        """Delete all existing secrets."""
        self._ensure_client_connected(self.config.key_vault_name)

        # List all secrets.
        for secret_property in self.CLIENT.list_properties_of_secrets():

            tags = secret_property.tags
            if not tags:
                continue

            if self.config.scope == SecretsManagerScope.NONE:
                # legacy, non-scoped secrets
                if ZENML_GROUP_KEY in tags:
                    logger.info(
                        "Deleted key-value pair {`%s`, `***`} from secret "
                        "`%s`",
                        secret_property.name,
                        tags.get(ZENML_GROUP_KEY),
                    )
                    self.CLIENT.begin_delete_secret(
                        secret_property.name
                    ).result()
                continue

            scope_tags = self._get_secret_scope_metadata()
            # all scope tags need to be included in the Azure secret tags,
            # otherwise the secret does not belong to the current scope
            if scope_tags.items() <= tags.items():
                self.CLIENT.begin_delete_secret(secret_property.name).result()
config: AzureSecretsManagerConfig property readonly

Returns the AzureSecretsManagerConfig config.

Returns:

Type Description
AzureSecretsManagerConfig

The configuration.

delete_all_secrets(self)

Delete all existing secrets.

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def delete_all_secrets(self) -> None:
    """Delete all existing secrets."""
    self._ensure_client_connected(self.config.key_vault_name)

    # List all secrets.
    for secret_property in self.CLIENT.list_properties_of_secrets():

        tags = secret_property.tags
        if not tags:
            continue

        if self.config.scope == SecretsManagerScope.NONE:
            # legacy, non-scoped secrets
            if ZENML_GROUP_KEY in tags:
                logger.info(
                    "Deleted key-value pair {`%s`, `***`} from secret "
                    "`%s`",
                    secret_property.name,
                    tags.get(ZENML_GROUP_KEY),
                )
                self.CLIENT.begin_delete_secret(
                    secret_property.name
                ).result()
            continue

        scope_tags = self._get_secret_scope_metadata()
        # all scope tags need to be included in the Azure secret tags,
        # otherwise the secret does not belong to the current scope
        if scope_tags.items() <= tags.items():
            self.CLIENT.begin_delete_secret(secret_property.name).result()
delete_secret(self, secret_name)

Delete an existing secret. by name.

Parameters:

Name Type Description Default
secret_name str

the name of the secret to delete

required

Exceptions:

Type Description
KeyError

if the secret no longer exists

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
    """Delete an existing secret. by name.

    Args:
        secret_name: the name of the secret to delete

    Raises:
        KeyError: if the secret no longer exists
    """
    self.validate_secret_name(secret_name)
    self._ensure_client_connected(self.config.key_vault_name)

    if self.config.scope == SecretsManagerScope.NONE:
        # legacy, non-scoped secrets

        # Go through all Azure secrets and delete the ones with the
        # secret_name as label.
        for secret_property in self.CLIENT.list_properties_of_secrets():
            tags = secret_property.tags
            if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
                self.CLIENT.begin_delete_secret(
                    secret_property.name
                ).result()

    else:
        if secret_name not in self.get_all_secret_keys():
            raise KeyError(
                f"Can't find the specified secret '{secret_name}'"
            )
        self.CLIENT.begin_delete_secret(
            self._get_scoped_secret_name(
                secret_name,
                separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
            ),
        ).result()
get_all_secret_keys(self)

Get all secret keys.

Returns:

Type Description
List[str]

A list of all secret keys

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
    """Get all secret keys.

    Returns:
        A list of all secret keys
    """
    self._ensure_client_connected(self.config.key_vault_name)

    set_of_secrets = set()

    for secret_property in self.CLIENT.list_properties_of_secrets():
        tags = secret_property.tags
        if not tags:
            continue

        if self.config.scope == SecretsManagerScope.NONE:
            # legacy, non-scoped secrets
            if ZENML_GROUP_KEY in tags:
                set_of_secrets.add(tags.get(ZENML_GROUP_KEY))
            continue

        scope_tags = self._get_secret_scope_metadata()
        # all scope tags need to be included in the Azure secret tags,
        # otherwise the secret does not belong to the current scope
        if scope_tags.items() <= tags.items():
            set_of_secrets.add(tags.get(ZENML_SECRET_NAME_LABEL))

    return list(set_of_secrets)
get_secret(self, secret_name)

Get a secret by its name.

Parameters:

Name Type Description Default
secret_name str

the name of the secret to get

required

Returns:

Type Description
BaseSecretSchema

The secret.

Exceptions:

Type Description
KeyError

if the secret does not exist

ValueError

if the secret is named 'name'

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
    """Get a secret by its name.

    Args:
        secret_name: the name of the secret to get

    Returns:
        The secret.

    Raises:
        KeyError: if the secret does not exist
        ValueError: if the secret is named 'name'
    """
    self.validate_secret_name(secret_name)
    self._ensure_client_connected(self.config.key_vault_name)
    zenml_secret: Optional[BaseSecretSchema] = None

    if self.config.scope == SecretsManagerScope.NONE:
        # Legacy secrets are mapped to multiple Azure secrets, one for
        # each secret key

        secret_contents = {}
        zenml_schema_name = ""

        for secret_property in self.CLIENT.list_properties_of_secrets():
            tags = secret_property.tags

            if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
                secret_key = tags.get(ZENML_KEY_NAME)
                if not secret_key:
                    raise ValueError("Missing secret key tag.")

                if secret_key == "name":
                    raise ValueError("The secret's key cannot be 'name'.")

                response = self.CLIENT.get_secret(secret_property.name)
                secret_contents[secret_key] = response.value

                zenml_schema_name = tags.get(ZENML_SCHEMA_NAME)

        if secret_contents:
            secret_contents["name"] = secret_name

            secret_schema = SecretSchemaClassRegistry.get_class(
                secret_schema=zenml_schema_name
            )
            zenml_secret = secret_schema(**secret_contents)
    else:
        # Scoped secrets are mapped 1-to-1 with Azure secrets

        try:
            response = self.CLIENT.get_secret(
                self._get_scoped_secret_name(
                    secret_name,
                    separator=ZENML_AZURE_SECRET_SCOPE_PATH_SEPARATOR,
                ),
            )

            scope_tags = self._get_secret_scope_metadata(secret_name)

            # all scope tags need to be included in the Azure secret tags,
            # otherwise the secret does not belong to the current scope,
            # even if it has the same name
            if scope_tags.items() <= response.properties.tags.items():
                zenml_secret = secret_from_dict(
                    json.loads(response.value), secret_name=secret_name
                )
        except ResourceNotFoundError:
            pass

    if not zenml_secret:
        raise KeyError(f"Can't find the specified secret '{secret_name}'")

    return zenml_secret
register_secret(self, secret)

Registers a new secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

the secret to register

required

Exceptions:

Type Description
SecretExistsError

if the secret already exists

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
    """Registers a new secret.

    Args:
        secret: the secret to register

    Raises:
        SecretExistsError: if the secret already exists
    """
    self.validate_secret_name(secret.name)
    self._ensure_client_connected(self.config.key_vault_name)

    if secret.name in self.get_all_secret_keys():
        raise SecretExistsError(
            f"A Secret with the name '{secret.name}' already exists."
        )

    self._create_or_update_secret(secret)
update_secret(self, secret)

Update an existing secret by creating new versions of the existing secrets.

Parameters:

Name Type Description Default
secret BaseSecretSchema

the secret to update

required

Exceptions:

Type Description
KeyError

if the secret does not exist

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
    """Update an existing secret by creating new versions of the existing secrets.

    Args:
        secret: the secret to update

    Raises:
        KeyError: if the secret does not exist
    """
    self.validate_secret_name(secret.name)
    self._ensure_client_connected(self.config.key_vault_name)

    if secret.name not in self.get_all_secret_keys():
        raise KeyError(f"Can't find the specified secret '{secret.name}'")

    self._create_or_update_secret(secret)
validate_secret_name(self, name)

Validate a secret name.

Parameters:

Name Type Description Default
name str

the secret name

required
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def validate_secret_name(self, name: str) -> None:
    """Validate a secret name.

    Args:
        name: the secret name
    """
    validate_azure_secret_name_or_namespace(name, self.config.scope)

step_operators special

Initialization of AzureML Step Operator integration.

azureml_step_operator

Implementation of the ZenML AzureML Step Operator.

AzureMLStepOperator (BaseStepOperator)

Step operator to run a step on AzureML.

This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.

Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
class AzureMLStepOperator(BaseStepOperator):
    """Step operator to run a step on AzureML.

    This class defines code that can set up an AzureML environment and run the
    ZenML entrypoint command in it.
    """

    @property
    def config(self) -> AzureMLStepOperatorConfig:
        """Returns the `AzureMLStepOperatorConfig` config.

        Returns:
            The configuration.
        """
        return cast(AzureMLStepOperatorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the AzureML step operator.

        Returns:
            The settings class.
        """
        return AzureMLStepOperatorSettings

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates the stack.

        Returns:
            A validator that checks that the stack contains a remote artifact
            store.
        """

        def _validate_remote_artifact_store(
            stack: "Stack",
        ) -> Tuple[bool, str]:
            if stack.artifact_store.config.is_local:
                return False, (
                    "The AzureML step operator runs code remotely and "
                    "needs to write files into the artifact store, but the "
                    f"artifact store `{stack.artifact_store.name}` of the "
                    "active stack is local. Please ensure that your stack "
                    "contains a remote artifact store when using the AzureML "
                    "step operator."
                )

            return True, ""

        return StackValidator(
            custom_validation_function=_validate_remote_artifact_store,
        )

    def _get_authentication(self) -> Optional[AbstractAuthentication]:
        """Returns the authentication object for the AzureML environment.

        Returns:
            The authentication object for the AzureML environment.
        """
        if (
            self.config.tenant_id
            and self.config.service_principal_id
            and self.config.service_principal_password
        ):
            return ServicePrincipalAuthentication(
                tenant_id=self.config.tenant_id,
                service_principal_id=self.config.service_principal_id,
                service_principal_password=self.config.service_principal_password,
            )
        return None

    def prepare_pipeline_deployment(
        self, deployment: "PipelineDeployment", stack: "Stack"
    ) -> None:
        """Store the active deployment in an environment variable.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """
        steps_to_run = [
            step
            for step in deployment.steps.values()
            if step.config.step_operator == self.name
        ]
        if not steps_to_run:
            return

        os.environ[ENV_ACTIVE_DEPLOYMENT] = deployment.yaml()

    def _prepare_environment(
        self,
        workspace: Workspace,
        docker_settings: "DockerSettings",
        run_name: str,
        environment_name: Optional[str] = None,
    ) -> Environment:
        """Prepares the environment in which Azure will run all jobs.

        Args:
            workspace: The AzureML Workspace that has configuration
                for a storage account, container registry among other
                things.
            docker_settings: The Docker settings for this step.
            run_name: The name of the pipeline run that can be used
                for naming environments and runs.
            environment_name: Optional name of an existing environment to use.

        Returns:
            The AzureML Environment object.
        """
        docker_image_builder = PipelineDockerImageBuilder()
        requirements_files = docker_image_builder._gather_requirements_files(
            docker_settings=docker_settings,
            stack=Client().active_stack,
        )
        requirements = list(
            itertools.chain.from_iterable(
                r[1].split("\n") for r in requirements_files
            )
        )
        requirements.append(f"zenml=={zenml.__version__}")
        logger.info(
            "Using requirements for AzureML step operator environment: %s",
            requirements,
        )
        if environment_name:
            environment = Environment.get(
                workspace=workspace, name=environment_name
            )
            if not environment.python.conda_dependencies:
                environment.python.conda_dependencies = (
                    CondaDependencies.create(
                        python_version=ZenMLEnvironment.python_version()
                    )
                )

            for requirement in requirements:
                environment.python.conda_dependencies.add_pip_package(
                    requirement
                )
        else:
            environment = Environment(name=f"zenml-{run_name}")
            environment.python.conda_dependencies = CondaDependencies.create(
                pip_packages=requirements,
                python_version=ZenMLEnvironment.python_version(),
            )

            if docker_settings.parent_image:
                # replace the default azure base image
                environment.docker.base_image = docker_settings.parent_image

        environment_variables = {
            "ENV_ZENML_PREVENT_PIPELINE_EXECUTION": "True",
        }
        # set credentials to access azure storage
        for key in [
            "AZURE_STORAGE_ACCOUNT_KEY",
            "AZURE_STORAGE_ACCOUNT_NAME",
            "AZURE_STORAGE_CONNECTION_STRING",
            "AZURE_STORAGE_SAS_TOKEN",
        ]:
            value = os.getenv(key)
            if value:
                environment_variables[key] = value

        environment_variables[
            ENV_ZENML_CONFIG_PATH
        ] = f"./{DOCKER_IMAGE_ZENML_CONFIG_DIR}"
        environment_variables.update(docker_settings.environment)

        environment.environment_variables = environment_variables
        return environment

    def launch(
        self,
        info: "StepRunInfo",
        entrypoint_command: List[str],
    ) -> None:
        """Launches a step on AzureML.

        Args:
            info: Information about the step run.
            entrypoint_command: Command that executes the step.

        Raises:
            RuntimeError: If the deployment config can't be found.
        """
        if not info.config.resource_settings.empty:
            logger.warning(
                "Specifying custom step resources is not supported for "
                "the AzureML step operator. If you want to run this step "
                "operator on specific resources, you can do so by creating an "
                "Azure compute target (https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target) "
                "with a specific machine type and then updating this step "
                "operator: `zenml step-operator update %s "
                "--compute_target_name=<COMPUTE_TARGET_NAME>`",
                self.name,
            )

        unused_docker_fields = [
            "dockerfile",
            "build_context_root",
            "build_options",
            "docker_target_repository",
            "dockerignore",
            "copy_files",
            "copy_global_config",
            "apt_packages",
        ]
        docker_settings = info.pipeline.docker_settings
        ignored_docker_fields = docker_settings.__fields_set__.intersection(
            unused_docker_fields
        )

        if ignored_docker_fields:
            logger.warning(
                "The AzureML step operator currently does not support all "
                "options defined in your Docker settings. Ignoring all "
                "values set for the attributes: %s",
                ignored_docker_fields,
            )

        settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))

        workspace = Workspace.get(
            subscription_id=self.config.subscription_id,
            resource_group=self.config.resource_group,
            name=self.config.workspace_name,
            auth=self._get_authentication(),
        )

        source_directory = get_source_root_path()
        deployment = os.environ.get(ENV_ACTIVE_DEPLOYMENT)
        deployment_path = os.path.join(
            source_directory, DOCKER_IMAGE_DEPLOYMENT_CONFIG_FILE
        )

        if deployment:
            with open(deployment_path, "w") as f:
                f.write(deployment)
        elif not os.path.exists(deployment_path):
            # We're running in a non-local environment which should already
            # include the deployment at the source root
            raise RuntimeError("Unable to find deployment configuration.")

        with _include_global_config(
            build_context_root=source_directory,
            load_config_path=PurePosixPath(
                f"./{DOCKER_IMAGE_ZENML_CONFIG_DIR}"
            ),
        ):
            environment = self._prepare_environment(
                workspace=workspace,
                docker_settings=docker_settings,
                run_name=info.run_name,
                environment_name=settings.environment_name,
            )
            compute_target = ComputeTarget(
                workspace=workspace, name=self.config.compute_target_name
            )

            try:
                run_config = ScriptRunConfig(
                    source_directory=source_directory,
                    environment=environment,
                    compute_target=compute_target,
                    command=entrypoint_command,
                )

                experiment = Experiment(
                    workspace=workspace, name=info.pipeline.name
                )
                run = experiment.submit(config=run_config)
            finally:
                if deployment:
                    os.remove(deployment_path)

        run.display_name = info.run_name
        run.wait_for_completion(show_output=True)
config: AzureMLStepOperatorConfig property readonly

Returns the AzureMLStepOperatorConfig config.

Returns:

Type Description
AzureMLStepOperatorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the AzureML step operator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A validator that checks that the stack contains a remote artifact store.

launch(self, info, entrypoint_command)

Launches a step on AzureML.

Parameters:

Name Type Description Default
info StepRunInfo

Information about the step run.

required
entrypoint_command List[str]

Command that executes the step.

required

Exceptions:

Type Description
RuntimeError

If the deployment config can't be found.

Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def launch(
    self,
    info: "StepRunInfo",
    entrypoint_command: List[str],
) -> None:
    """Launches a step on AzureML.

    Args:
        info: Information about the step run.
        entrypoint_command: Command that executes the step.

    Raises:
        RuntimeError: If the deployment config can't be found.
    """
    if not info.config.resource_settings.empty:
        logger.warning(
            "Specifying custom step resources is not supported for "
            "the AzureML step operator. If you want to run this step "
            "operator on specific resources, you can do so by creating an "
            "Azure compute target (https://docs.microsoft.com/en-us/azure/machine-learning/concept-compute-target) "
            "with a specific machine type and then updating this step "
            "operator: `zenml step-operator update %s "
            "--compute_target_name=<COMPUTE_TARGET_NAME>`",
            self.name,
        )

    unused_docker_fields = [
        "dockerfile",
        "build_context_root",
        "build_options",
        "docker_target_repository",
        "dockerignore",
        "copy_files",
        "copy_global_config",
        "apt_packages",
    ]
    docker_settings = info.pipeline.docker_settings
    ignored_docker_fields = docker_settings.__fields_set__.intersection(
        unused_docker_fields
    )

    if ignored_docker_fields:
        logger.warning(
            "The AzureML step operator currently does not support all "
            "options defined in your Docker settings. Ignoring all "
            "values set for the attributes: %s",
            ignored_docker_fields,
        )

    settings = cast(AzureMLStepOperatorSettings, self.get_settings(info))

    workspace = Workspace.get(
        subscription_id=self.config.subscription_id,
        resource_group=self.config.resource_group,
        name=self.config.workspace_name,
        auth=self._get_authentication(),
    )

    source_directory = get_source_root_path()
    deployment = os.environ.get(ENV_ACTIVE_DEPLOYMENT)
    deployment_path = os.path.join(
        source_directory, DOCKER_IMAGE_DEPLOYMENT_CONFIG_FILE
    )

    if deployment:
        with open(deployment_path, "w") as f:
            f.write(deployment)
    elif not os.path.exists(deployment_path):
        # We're running in a non-local environment which should already
        # include the deployment at the source root
        raise RuntimeError("Unable to find deployment configuration.")

    with _include_global_config(
        build_context_root=source_directory,
        load_config_path=PurePosixPath(
            f"./{DOCKER_IMAGE_ZENML_CONFIG_DIR}"
        ),
    ):
        environment = self._prepare_environment(
            workspace=workspace,
            docker_settings=docker_settings,
            run_name=info.run_name,
            environment_name=settings.environment_name,
        )
        compute_target = ComputeTarget(
            workspace=workspace, name=self.config.compute_target_name
        )

        try:
            run_config = ScriptRunConfig(
                source_directory=source_directory,
                environment=environment,
                compute_target=compute_target,
                command=entrypoint_command,
            )

            experiment = Experiment(
                workspace=workspace, name=info.pipeline.name
            )
            run = experiment.submit(config=run_config)
        finally:
            if deployment:
                os.remove(deployment_path)

    run.display_name = info.run_name
    run.wait_for_completion(show_output=True)
prepare_pipeline_deployment(self, deployment, stack)

Store the active deployment in an environment variable.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def prepare_pipeline_deployment(
    self, deployment: "PipelineDeployment", stack: "Stack"
) -> None:
    """Store the active deployment in an environment variable.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    steps_to_run = [
        step
        for step in deployment.steps.values()
        if step.config.step_operator == self.name
    ]
    if not steps_to_run:
        return

    os.environ[ENV_ACTIVE_DEPLOYMENT] = deployment.yaml()