Skip to content

Integrations

zenml.integrations special

ZenML integrations module.

The ZenML integrations module contains sub-modules for each integration that we support. This includes orchestrators like Apache Airflow, visualization tools like the facets library, as well as deep learning libraries like PyTorch.

airflow special

Airflow integration for ZenML.

The Airflow integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Airflow orchestrator with the CLI tool, then bootstrap using the zenml orchestrator up command.

AirflowIntegration (Integration)

Definition of Airflow Integration for ZenML.

Source code in zenml/integrations/airflow/__init__.py
class AirflowIntegration(Integration):
    """Definition of Airflow Integration for ZenML."""

    NAME = AIRFLOW
    REQUIREMENTS = ["apache-airflow==2.2.0"]

    @classmethod
    def flavors(cls) -> List[FlavorWrapper]:
        """Declare the stack component flavors for the Airflow integration.

        Returns:
            List of stack component flavors for this integration.
        """
        return [
            FlavorWrapper(
                name=AIRFLOW_ORCHESTRATOR_FLAVOR,
                source="zenml.integrations.airflow.orchestrators.AirflowOrchestrator",
                type=StackComponentType.ORCHESTRATOR,
                integration=cls.NAME,
            )
        ]
flavors() classmethod

Declare the stack component flavors for the Airflow integration.

Returns:

Type Description
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper]

List of stack component flavors for this integration.

Source code in zenml/integrations/airflow/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
    """Declare the stack component flavors for the Airflow integration.

    Returns:
        List of stack component flavors for this integration.
    """
    return [
        FlavorWrapper(
            name=AIRFLOW_ORCHESTRATOR_FLAVOR,
            source="zenml.integrations.airflow.orchestrators.AirflowOrchestrator",
            type=StackComponentType.ORCHESTRATOR,
            integration=cls.NAME,
        )
    ]

orchestrators special

The Airflow integration enables the use of Airflow as a pipeline orchestrator.

airflow_orchestrator

Implementation of Airflow orchestrator integration.

AirflowOrchestrator (BaseOrchestrator) pydantic-model

Orchestrator responsible for running pipelines using Airflow.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class AirflowOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines using Airflow."""

    airflow_home: str = ""

    # Class Configuration
    FLAVOR: ClassVar[str] = AIRFLOW_ORCHESTRATOR_FLAVOR

    def __init__(self, **values: Any):
        """Sets environment variables to configure airflow.

        Args:
            **values: Values to set in the orchestrator.
        """
        super().__init__(**values)
        self._set_env()

    @staticmethod
    def _translate_schedule(
        schedule: Optional[Schedule] = None,
    ) -> Dict[str, Any]:
        """Convert ZenML schedule into Airflow schedule.

        The Airflow schedule uses slightly different naming and needs some
        default entries for execution without a schedule.

        Args:
            schedule: Containing the interval, start and end date and
                a boolean flag that defines if past runs should be caught up
                on

        Returns:
            Airflow configuration dict.
        """
        if schedule:
            if schedule.cron_expression:
                return {
                    "schedule_interval": schedule.cron_expression,
                }
            else:
                return {
                    "schedule_interval": schedule.interval_second,
                    "start_date": schedule.start_time,
                    "end_date": schedule.end_time,
                    "catchup": schedule.catchup,
                }

        return {
            "schedule_interval": "@once",
            # set the a start time in the past and disable catchup so airflow runs the dag immediately
            "start_date": datetime.datetime.now() - datetime.timedelta(7),
            "catchup": False,
        }

    def prepare_or_run_pipeline(
        self,
        sorted_steps: List[BaseStep],
        pipeline: "BasePipeline",
        pb2_pipeline: Pb2Pipeline,
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> Any:
        """Creates an Airflow DAG as the intermediate representation for the pipeline.

        This DAG will be loaded by airflow in the target environment
        and used for orchestration of the pipeline.

        How it works:
        -------------
        A new airflow_dag is instantiated with the pipeline name and among
        others things the run schedule.

        For each step of the pipeline a callable is created. This callable
        uses the run_step() method to execute the step. The parameters of
        this callable are pre-filled and an airflow step_operator is created
        within the dag. The dependencies to upstream steps are then
        configured.

        Finally, the dag is fully complete and can be returned.

        Args:
            sorted_steps: List of steps in the pipeline.
            pipeline: The pipeline to be executed.
            pb2_pipeline: The pipeline as a protobuf message.
            stack: The stack on which the pipeline will be deployed.
            runtime_configuration: The runtime configuration.

        Returns:
            The Airflow DAG.
        """
        import airflow
        from airflow.operators import python as airflow_python

        # Instantiate and configure airflow Dag with name and schedule
        airflow_dag = airflow.DAG(
            dag_id=pipeline.name,
            is_paused_upon_creation=False,
            **self._translate_schedule(runtime_configuration.schedule),
        )

        # Dictionary mapping step names to airflow_operators. This will be needed
        # to configure airflow operator dependencies
        step_name_to_airflow_operator = {}

        for step in sorted_steps:
            # Create callable that will be used by airflow to execute the step
            # within the orchestrated environment
            def _step_callable(step_instance: "BaseStep", **kwargs):
                # Extract run name for the kwargs that will be passed to the
                # callable
                run_name = kwargs["ti"].get_dagrun().run_id
                self.run_step(
                    step=step_instance,
                    run_name=run_name,
                    pb2_pipeline=pb2_pipeline,
                )

            # Create airflow python operator that contains the step callable
            airflow_operator = airflow_python.PythonOperator(
                dag=airflow_dag,
                task_id=step.name,
                provide_context=True,
                python_callable=functools.partial(
                    _step_callable, step_instance=step
                ),
            )

            # Configure the current airflow operator to run after all upstream
            # operators finished executing
            step_name_to_airflow_operator[step.name] = airflow_operator
            upstream_step_names = self.get_upstream_step_names(
                step=step, pb2_pipeline=pb2_pipeline
            )
            for upstream_step_name in upstream_step_names:
                airflow_operator.set_upstream(
                    step_name_to_airflow_operator[upstream_step_name]
                )

        # Return the finished airflow dag
        return airflow_dag

    @root_validator(skip_on_failure=True)
    def set_airflow_home(cls, values: Dict[str, Any]) -> Dict[str, Any]:
        """Sets Airflow home according to orchestrator UUID.

        Args:
            values: Dictionary containing all orchestrator attributes values.

        Returns:
            Dictionary containing all orchestrator attributes values and the airflow home.

        Raises:
            ValueError: If the orchestrator UUID is not set.
        """
        if "uuid" not in values:
            raise ValueError("`uuid` needs to exist for AirflowOrchestrator.")
        values["airflow_home"] = os.path.join(
            io_utils.get_global_config_directory(),
            AIRFLOW_ROOT_DIR,
            str(values["uuid"]),
        )
        return values

    @property
    def dags_directory(self) -> str:
        """Returns path to the airflow dags directory.

        Returns:
            Path to the airflow dags directory.
        """
        return os.path.join(self.airflow_home, "dags")

    @property
    def pid_file(self) -> str:
        """Returns path to the daemon PID file.

        Returns:
            Path to the daemon PID file.
        """
        return os.path.join(self.airflow_home, "airflow_daemon.pid")

    @property
    def log_file(self) -> str:
        """Returns path to the airflow log file.

        Returns:
            str: Path to the airflow log file.
        """
        return os.path.join(self.airflow_home, "airflow_orchestrator.log")

    @property
    def password_file(self) -> str:
        """Returns path to the webserver password file.

        Returns:
            Path to the webserver password file.
        """
        return os.path.join(self.airflow_home, "standalone_admin_password.txt")

    def _set_env(self) -> None:
        """Sets environment variables to configure airflow."""
        os.environ["AIRFLOW_HOME"] = self.airflow_home
        os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = self.dags_directory
        os.environ["AIRFLOW__CORE__DAG_DISCOVERY_SAFE_MODE"] = "false"
        os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "false"
        # check the DAG folder every 10 seconds for new files
        os.environ["AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL"] = "10"

    def _copy_to_dag_directory_if_necessary(self, dag_filepath: str) -> None:
        """Copies DAG module to the Airflow DAGs directory if not already present.

        Args:
            dag_filepath: Path to the file in which the DAG is defined.
        """
        dags_directory = io_utils.resolve_relative_path(self.dags_directory)

        if dags_directory == os.path.dirname(dag_filepath):
            logger.debug("File is already in airflow DAGs directory.")
        else:
            logger.debug(
                "Copying dag file '%s' to DAGs directory.", dag_filepath
            )
            destination_path = os.path.join(
                dags_directory, os.path.basename(dag_filepath)
            )
            if fileio.exists(destination_path):
                logger.info(
                    "File '%s' already exists, overwriting with new DAG file",
                    destination_path,
                )
            fileio.copy(dag_filepath, destination_path, overwrite=True)

    def _log_webserver_credentials(self) -> None:
        """Logs URL and credentials to log in to the airflow webserver.

        Raises:
            FileNotFoundError: If the password file does not exist.
        """
        if fileio.exists(self.password_file):
            with open(self.password_file) as file:
                password = file.read().strip()
        else:
            raise FileNotFoundError(
                f"Can't find password file '{self.password_file}'"
            )
        logger.info(
            "To inspect your DAGs, login to http://0.0.0.0:8080 "
            "with username: admin password: %s",
            password,
        )

    def runtime_options(self) -> Dict[str, Any]:
        """Runtime options for the airflow orchestrator.

        Returns:
            Runtime options dictionary.
        """
        return {DAG_FILEPATH_OPTION_KEY: None}

    def prepare_pipeline_deployment(
        self,
        pipeline: "BasePipeline",
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> None:
        """Checks Airflow is running and copies DAG file to the Airflow DAGs directory.

        Args:
            pipeline: Pipeline to be deployed.
            stack: Stack to be deployed.
            runtime_configuration: Runtime configuration for the pipeline.

        Raises:
            RuntimeError: If Airflow is not running or no DAG filepath runtime
                          option is provided.
        """
        if not self.is_running:
            raise RuntimeError(
                "Airflow orchestrator is currently not running. Run `zenml "
                "stack up` to provision resources for the active stack."
            )

        try:
            dag_filepath = runtime_configuration[DAG_FILEPATH_OPTION_KEY]
        except KeyError:
            raise RuntimeError(
                f"No DAG filepath found in runtime configuration. Make sure "
                f"to add the filepath to your airflow DAG file as a runtime "
                f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
            )

        self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)

    @property
    def is_running(self) -> bool:
        """Returns whether the airflow daemon is currently running.

        Returns:
            True if the daemon is running, False otherwise.

        Raises:
            RuntimeError: If port 8080 is occupied.
        """
        from airflow.cli.commands.standalone_command import StandaloneCommand
        from airflow.jobs.triggerer_job import TriggererJob

        daemon_running = daemon.check_if_daemon_is_running(self.pid_file)

        command = StandaloneCommand()
        webserver_port_open = command.port_open(8080)

        if not daemon_running:
            if webserver_port_open:
                raise RuntimeError(
                    "The airflow daemon does not seem to be running but "
                    "local port 8080 is occupied. Make sure the port is "
                    "available and try again."
                )

            # exit early so we don't check non-existing airflow databases
            return False

        # we can't use StandaloneCommand().is_ready() here as the
        # Airflow SequentialExecutor apparently does not send a heartbeat
        # while running a task which would result in this returning `False`
        # even if Airflow is running.
        airflow_running = webserver_port_open and command.job_running(
            TriggererJob
        )
        return airflow_running

    @property
    def is_provisioned(self) -> bool:
        """Returns whether the airflow daemon is currently running.

        Returns:
            True if the airflow daemon is running, False otherwise.
        """
        return self.is_running

    def provision(self) -> None:
        """Ensures that Airflow is running."""
        if self.is_running:
            logger.info("Airflow is already running.")
            self._log_webserver_credentials()
            return

        if not fileio.exists(self.dags_directory):
            io_utils.create_dir_recursive_if_not_exists(self.dags_directory)

        from airflow.cli.commands.standalone_command import StandaloneCommand

        try:
            command = StandaloneCommand()
            # Run the daemon with a working directory inside the current
            # zenml repo so the same repo will be used to run the DAGs
            daemon.run_as_daemon(
                command.run,
                pid_file=self.pid_file,
                log_file=self.log_file,
                working_directory=get_source_root_path(),
            )
            while not self.is_running:
                # Wait until the daemon started all the relevant airflow
                # processes
                time.sleep(0.1)
            self._log_webserver_credentials()
        except Exception as e:
            logger.error(e)
            logger.error(
                "An error occurred while starting the Airflow daemon. If you "
                "want to start it manually, use the commands described in the "
                "official Airflow quickstart guide for running Airflow locally."
            )
            self.deprovision()

    def deprovision(self) -> None:
        """Stops the airflow daemon if necessary and tears down resources."""
        if self.is_running:
            daemon.stop_daemon(self.pid_file)

        fileio.rmtree(self.airflow_home)
        logger.info("Airflow spun down.")
dags_directory: str property readonly

Returns path to the airflow dags directory.

Returns:

Type Description
str

Path to the airflow dags directory.

is_provisioned: bool property readonly

Returns whether the airflow daemon is currently running.

Returns:

Type Description
bool

True if the airflow daemon is running, False otherwise.

is_running: bool property readonly

Returns whether the airflow daemon is currently running.

Returns:

Type Description
bool

True if the daemon is running, False otherwise.

Exceptions:

Type Description
RuntimeError

If port 8080 is occupied.

log_file: str property readonly

Returns path to the airflow log file.

Returns:

Type Description
str

Path to the airflow log file.

password_file: str property readonly

Returns path to the webserver password file.

Returns:

Type Description
str

Path to the webserver password file.

pid_file: str property readonly

Returns path to the daemon PID file.

Returns:

Type Description
str

Path to the daemon PID file.

__init__(self, **values) special

Sets environment variables to configure airflow.

Parameters:

Name Type Description Default
**values Any

Values to set in the orchestrator.

{}
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __init__(self, **values: Any):
    """Sets environment variables to configure airflow.

    Args:
        **values: Values to set in the orchestrator.
    """
    super().__init__(**values)
    self._set_env()
deprovision(self)

Stops the airflow daemon if necessary and tears down resources.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def deprovision(self) -> None:
    """Stops the airflow daemon if necessary and tears down resources."""
    if self.is_running:
        daemon.stop_daemon(self.pid_file)

    fileio.rmtree(self.airflow_home)
    logger.info("Airflow spun down.")
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)

Creates an Airflow DAG as the intermediate representation for the pipeline.

This DAG will be loaded by airflow in the target environment and used for orchestration of the pipeline.

How it works:

A new airflow_dag is instantiated with the pipeline name and among others things the run schedule.

For each step of the pipeline a callable is created. This callable uses the run_step() method to execute the step. The parameters of this callable are pre-filled and an airflow step_operator is created within the dag. The dependencies to upstream steps are then configured.

Finally, the dag is fully complete and can be returned.

Parameters:

Name Type Description Default
sorted_steps List[zenml.steps.base_step.BaseStep]

List of steps in the pipeline.

required
pipeline BasePipeline

The pipeline to be executed.

required
pb2_pipeline Pipeline

The pipeline as a protobuf message.

required
stack Stack

The stack on which the pipeline will be deployed.

required
runtime_configuration RuntimeConfiguration

The runtime configuration.

required

Returns:

Type Description
Any

The Airflow DAG.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_or_run_pipeline(
    self,
    sorted_steps: List[BaseStep],
    pipeline: "BasePipeline",
    pb2_pipeline: Pb2Pipeline,
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> Any:
    """Creates an Airflow DAG as the intermediate representation for the pipeline.

    This DAG will be loaded by airflow in the target environment
    and used for orchestration of the pipeline.

    How it works:
    -------------
    A new airflow_dag is instantiated with the pipeline name and among
    others things the run schedule.

    For each step of the pipeline a callable is created. This callable
    uses the run_step() method to execute the step. The parameters of
    this callable are pre-filled and an airflow step_operator is created
    within the dag. The dependencies to upstream steps are then
    configured.

    Finally, the dag is fully complete and can be returned.

    Args:
        sorted_steps: List of steps in the pipeline.
        pipeline: The pipeline to be executed.
        pb2_pipeline: The pipeline as a protobuf message.
        stack: The stack on which the pipeline will be deployed.
        runtime_configuration: The runtime configuration.

    Returns:
        The Airflow DAG.
    """
    import airflow
    from airflow.operators import python as airflow_python

    # Instantiate and configure airflow Dag with name and schedule
    airflow_dag = airflow.DAG(
        dag_id=pipeline.name,
        is_paused_upon_creation=False,
        **self._translate_schedule(runtime_configuration.schedule),
    )

    # Dictionary mapping step names to airflow_operators. This will be needed
    # to configure airflow operator dependencies
    step_name_to_airflow_operator = {}

    for step in sorted_steps:
        # Create callable that will be used by airflow to execute the step
        # within the orchestrated environment
        def _step_callable(step_instance: "BaseStep", **kwargs):
            # Extract run name for the kwargs that will be passed to the
            # callable
            run_name = kwargs["ti"].get_dagrun().run_id
            self.run_step(
                step=step_instance,
                run_name=run_name,
                pb2_pipeline=pb2_pipeline,
            )

        # Create airflow python operator that contains the step callable
        airflow_operator = airflow_python.PythonOperator(
            dag=airflow_dag,
            task_id=step.name,
            provide_context=True,
            python_callable=functools.partial(
                _step_callable, step_instance=step
            ),
        )

        # Configure the current airflow operator to run after all upstream
        # operators finished executing
        step_name_to_airflow_operator[step.name] = airflow_operator
        upstream_step_names = self.get_upstream_step_names(
            step=step, pb2_pipeline=pb2_pipeline
        )
        for upstream_step_name in upstream_step_names:
            airflow_operator.set_upstream(
                step_name_to_airflow_operator[upstream_step_name]
            )

    # Return the finished airflow dag
    return airflow_dag
prepare_pipeline_deployment(self, pipeline, stack, runtime_configuration)

Checks Airflow is running and copies DAG file to the Airflow DAGs directory.

Parameters:

Name Type Description Default
pipeline BasePipeline

Pipeline to be deployed.

required
stack Stack

Stack to be deployed.

required
runtime_configuration RuntimeConfiguration

Runtime configuration for the pipeline.

required

Exceptions:

Type Description
RuntimeError

If Airflow is not running or no DAG filepath runtime option is provided.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_pipeline_deployment(
    self,
    pipeline: "BasePipeline",
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> None:
    """Checks Airflow is running and copies DAG file to the Airflow DAGs directory.

    Args:
        pipeline: Pipeline to be deployed.
        stack: Stack to be deployed.
        runtime_configuration: Runtime configuration for the pipeline.

    Raises:
        RuntimeError: If Airflow is not running or no DAG filepath runtime
                      option is provided.
    """
    if not self.is_running:
        raise RuntimeError(
            "Airflow orchestrator is currently not running. Run `zenml "
            "stack up` to provision resources for the active stack."
        )

    try:
        dag_filepath = runtime_configuration[DAG_FILEPATH_OPTION_KEY]
    except KeyError:
        raise RuntimeError(
            f"No DAG filepath found in runtime configuration. Make sure "
            f"to add the filepath to your airflow DAG file as a runtime "
            f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
        )

    self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)
provision(self)

Ensures that Airflow is running.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def provision(self) -> None:
    """Ensures that Airflow is running."""
    if self.is_running:
        logger.info("Airflow is already running.")
        self._log_webserver_credentials()
        return

    if not fileio.exists(self.dags_directory):
        io_utils.create_dir_recursive_if_not_exists(self.dags_directory)

    from airflow.cli.commands.standalone_command import StandaloneCommand

    try:
        command = StandaloneCommand()
        # Run the daemon with a working directory inside the current
        # zenml repo so the same repo will be used to run the DAGs
        daemon.run_as_daemon(
            command.run,
            pid_file=self.pid_file,
            log_file=self.log_file,
            working_directory=get_source_root_path(),
        )
        while not self.is_running:
            # Wait until the daemon started all the relevant airflow
            # processes
            time.sleep(0.1)
        self._log_webserver_credentials()
    except Exception as e:
        logger.error(e)
        logger.error(
            "An error occurred while starting the Airflow daemon. If you "
            "want to start it manually, use the commands described in the "
            "official Airflow quickstart guide for running Airflow locally."
        )
        self.deprovision()
runtime_options(self)

Runtime options for the airflow orchestrator.

Returns:

Type Description
Dict[str, Any]

Runtime options dictionary.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def runtime_options(self) -> Dict[str, Any]:
    """Runtime options for the airflow orchestrator.

    Returns:
        Runtime options dictionary.
    """
    return {DAG_FILEPATH_OPTION_KEY: None}
set_airflow_home(values) classmethod

Sets Airflow home according to orchestrator UUID.

Parameters:

Name Type Description Default
values Dict[str, Any]

Dictionary containing all orchestrator attributes values.

required

Returns:

Type Description
Dict[str, Any]

Dictionary containing all orchestrator attributes values and the airflow home.

Exceptions:

Type Description
ValueError

If the orchestrator UUID is not set.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
@root_validator(skip_on_failure=True)
def set_airflow_home(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """Sets Airflow home according to orchestrator UUID.

    Args:
        values: Dictionary containing all orchestrator attributes values.

    Returns:
        Dictionary containing all orchestrator attributes values and the airflow home.

    Raises:
        ValueError: If the orchestrator UUID is not set.
    """
    if "uuid" not in values:
        raise ValueError("`uuid` needs to exist for AirflowOrchestrator.")
    values["airflow_home"] = os.path.join(
        io_utils.get_global_config_directory(),
        AIRFLOW_ROOT_DIR,
        str(values["uuid"]),
    )
    return values

aws special

Integrates multiple AWS Tools as Stack Components.

The AWS integration provides a way for our users to manage their secrets through AWS, a way to use the aws container registry. Additionally, the Sagemaker integration submodule provides a way to run ZenML steps in Sagemaker.

AWSIntegration (Integration)

Definition of AWS integration for ZenML.

Source code in zenml/integrations/aws/__init__.py
class AWSIntegration(Integration):
    """Definition of AWS integration for ZenML."""

    NAME = AWS
    REQUIREMENTS = ["boto3==1.21.21", "sagemaker==2.82.2"]

    @classmethod
    def flavors(cls) -> List[FlavorWrapper]:
        """Declare the stack component flavors for the AWS integration.

        Returns:
            List of stack component flavors for this integration.
        """
        return [
            FlavorWrapper(
                name=AWS_SECRET_MANAGER_FLAVOR,
                source="zenml.integrations.aws.secrets_managers"
                ".AWSSecretsManager",
                type=StackComponentType.SECRETS_MANAGER,
                integration=cls.NAME,
            ),
            FlavorWrapper(
                name=AWS_CONTAINER_REGISTRY_FLAVOR,
                source="zenml.integrations.aws.container_registries"
                ".AWSContainerRegistry",
                type=StackComponentType.CONTAINER_REGISTRY,
                integration=cls.NAME,
            ),
            FlavorWrapper(
                name=AWS_SAGEMAKER_STEP_OPERATOR_FLAVOR,
                source="zenml.integrations.aws.step_operators"
                ".SagemakerStepOperator",
                type=StackComponentType.STEP_OPERATOR,
                integration=cls.NAME,
            ),
        ]
flavors() classmethod

Declare the stack component flavors for the AWS integration.

Returns:

Type Description
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper]

List of stack component flavors for this integration.

Source code in zenml/integrations/aws/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
    """Declare the stack component flavors for the AWS integration.

    Returns:
        List of stack component flavors for this integration.
    """
    return [
        FlavorWrapper(
            name=AWS_SECRET_MANAGER_FLAVOR,
            source="zenml.integrations.aws.secrets_managers"
            ".AWSSecretsManager",
            type=StackComponentType.SECRETS_MANAGER,
            integration=cls.NAME,
        ),
        FlavorWrapper(
            name=AWS_CONTAINER_REGISTRY_FLAVOR,
            source="zenml.integrations.aws.container_registries"
            ".AWSContainerRegistry",
            type=StackComponentType.CONTAINER_REGISTRY,
            integration=cls.NAME,
        ),
        FlavorWrapper(
            name=AWS_SAGEMAKER_STEP_OPERATOR_FLAVOR,
            source="zenml.integrations.aws.step_operators"
            ".SagemakerStepOperator",
            type=StackComponentType.STEP_OPERATOR,
            integration=cls.NAME,
        ),
    ]

container_registries special

Initialization of AWS Container Registry integration.

aws_container_registry

Implementation of the AWS container registry integration.

AWSContainerRegistry (BaseContainerRegistry) pydantic-model

Class for AWS Container Registry.

Source code in zenml/integrations/aws/container_registries/aws_container_registry.py
class AWSContainerRegistry(BaseContainerRegistry):
    """Class for AWS Container Registry."""

    # Class Configuration
    FLAVOR: ClassVar[str] = AWS_CONTAINER_REGISTRY_FLAVOR

    @validator("uri")
    def validate_aws_uri(cls, uri: str) -> str:
        """Validates that the URI is in the correct format.

        Args:
            uri: URI to validate.

        Returns:
            URI in the correct format.

        Raises:
            ValueError: If the URI contains a slash character.
        """
        if "/" in uri:
            raise ValueError(
                "Property `uri` can not contain a `/`. An example of a valid "
                "URI is: `715803424592.dkr.ecr.us-east-1.amazonaws.com`"
            )

        return uri

    def prepare_image_push(self, image_name: str) -> None:
        """Logs warning message if trying to push an image for which no repository exists.

        Args:
            image_name: Name of the docker image that will be pushed.

        Raises:
            ValueError: If the docker image name is invalid.
        """
        response = boto3.client("ecr").describe_repositories()
        try:
            repo_uris: List[str] = [
                repository["repositoryUri"]
                for repository in response["repositories"]
            ]
        except (KeyError, ClientError) as e:
            # invalid boto response, let's hope for the best and just push
            logger.debug("Error while trying to fetch ECR repositories: %s", e)
            return

        repo_exists = any(image_name.startswith(f"{uri}:") for uri in repo_uris)
        if not repo_exists:
            match = re.search(f"{self.uri}/(.*):.*", image_name)
            if not match:
                raise ValueError(f"Invalid docker image name '{image_name}'.")

            repo_name = match.group(1)
            logger.warning(
                "Amazon ECR requires you to create a repository before you can "
                f"push an image to it. ZenML is trying to push the image "
                f"{image_name} but could only detect the following "
                f"repositories: {repo_uris}. We will try to push anyway, but "
                f"in case it fails you need to create a repository named "
                f"`{repo_name}`."
            )

    @property
    def post_registration_message(self) -> Optional[str]:
        """Optional message printed after the stack component is registered.

        Returns:
            Info message regarding docker repositories in AWS.
        """
        return (
            "Amazon ECR requires you to create a repository before you can "
            "push an image to it. If you want to for example run a pipeline "
            "using our Kubeflow orchestrator, ZenML will automatically build a "
            f"docker image called `{self.uri}/zenml-kubeflow:<PIPELINE_NAME>` "
            f"and try to push it. This will fail unless you create the "
            f"repository `zenml-kubeflow` inside your amazon registry."
        )
post_registration_message: Optional[str] property readonly

Optional message printed after the stack component is registered.

Returns:

Type Description
Optional[str]

Info message regarding docker repositories in AWS.

prepare_image_push(self, image_name)

Logs warning message if trying to push an image for which no repository exists.

Parameters:

Name Type Description Default
image_name str

Name of the docker image that will be pushed.

required

Exceptions:

Type Description
ValueError

If the docker image name is invalid.

Source code in zenml/integrations/aws/container_registries/aws_container_registry.py
def prepare_image_push(self, image_name: str) -> None:
    """Logs warning message if trying to push an image for which no repository exists.

    Args:
        image_name: Name of the docker image that will be pushed.

    Raises:
        ValueError: If the docker image name is invalid.
    """
    response = boto3.client("ecr").describe_repositories()
    try:
        repo_uris: List[str] = [
            repository["repositoryUri"]
            for repository in response["repositories"]
        ]
    except (KeyError, ClientError) as e:
        # invalid boto response, let's hope for the best and just push
        logger.debug("Error while trying to fetch ECR repositories: %s", e)
        return

    repo_exists = any(image_name.startswith(f"{uri}:") for uri in repo_uris)
    if not repo_exists:
        match = re.search(f"{self.uri}/(.*):.*", image_name)
        if not match:
            raise ValueError(f"Invalid docker image name '{image_name}'.")

        repo_name = match.group(1)
        logger.warning(
            "Amazon ECR requires you to create a repository before you can "
            f"push an image to it. ZenML is trying to push the image "
            f"{image_name} but could only detect the following "
            f"repositories: {repo_uris}. We will try to push anyway, but "
            f"in case it fails you need to create a repository named "
            f"`{repo_name}`."
        )
validate_aws_uri(uri) classmethod

Validates that the URI is in the correct format.

Parameters:

Name Type Description Default
uri str

URI to validate.

required

Returns:

Type Description
str

URI in the correct format.

Exceptions:

Type Description
ValueError

If the URI contains a slash character.

Source code in zenml/integrations/aws/container_registries/aws_container_registry.py
@validator("uri")
def validate_aws_uri(cls, uri: str) -> str:
    """Validates that the URI is in the correct format.

    Args:
        uri: URI to validate.

    Returns:
        URI in the correct format.

    Raises:
        ValueError: If the URI contains a slash character.
    """
    if "/" in uri:
        raise ValueError(
            "Property `uri` can not contain a `/`. An example of a valid "
            "URI is: `715803424592.dkr.ecr.us-east-1.amazonaws.com`"
        )

    return uri

secrets_managers special

AWS Secrets Manager.

aws_secrets_manager

Implementation of the AWS Secrets Manager integration.

AWSSecretsManager (BaseSecretsManager) pydantic-model

Class to interact with the AWS secrets manager.

Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
class AWSSecretsManager(BaseSecretsManager):
    """Class to interact with the AWS secrets manager."""

    region_name: str = DEFAULT_AWS_REGION

    # Class configuration
    FLAVOR: ClassVar[str] = AWS_SECRET_MANAGER_FLAVOR
    CLIENT: ClassVar[Any] = None

    @classmethod
    def _ensure_client_connected(cls, region_name: str) -> None:
        """Ensure that the client is connected to the AWS secrets manager.

        Args:
            region_name: the AWS region name
        """
        if cls.CLIENT is None:
            # Create a Secrets Manager client
            session = boto3.session.Session()
            cls.CLIENT = session.client(
                service_name="secretsmanager", region_name=region_name
            )

    def register_secret(self, secret: BaseSecretSchema) -> None:
        """Registers a new secret.

        Args:
            secret: the secret to register

        Raises:
            SecretExistsError: if the secret already exists
        """
        self._ensure_client_connected(self.region_name)
        secret_value = jsonify_secret_contents(secret)

        if secret.name in self.get_all_secret_keys():
            raise SecretExistsError(
                f"A Secret with the name {secret.name} already exists"
            )

        kwargs = {"Name": secret.name, "SecretString": secret_value}

        self.CLIENT.create_secret(**kwargs)

    def get_secret(self, secret_name: str) -> BaseSecretSchema:
        """Gets a secret.

        Args:
            secret_name: the name of the secret to get

        Returns:
            The secret.

        Raises:
            RuntimeError: if the secret does not exist
        """
        self._ensure_client_connected(self.region_name)
        get_secret_value_response = self.CLIENT.get_secret_value(
            SecretId=secret_name
        )
        if "SecretString" not in get_secret_value_response:
            raise RuntimeError(f"No secrets found within the {secret_name}")
        secret_contents: Dict[str, str] = json.loads(
            get_secret_value_response["SecretString"]
        )

        zenml_schema_name = secret_contents.pop(ZENML_SCHEMA_NAME)
        secret_contents["name"] = secret_name

        secret_schema = SecretSchemaClassRegistry.get_class(
            secret_schema=zenml_schema_name
        )
        return secret_schema(**secret_contents)

    def get_all_secret_keys(self) -> List[str]:
        """Get all secret keys.

        Returns:
            A list of all secret keys
        """
        self._ensure_client_connected(self.region_name)

        # TODO [ENG-720]: Deal with pagination in the aws secret manager when
        #  listing all secrets
        # TODO [ENG-721]: take out this magic maxresults number
        response = self.CLIENT.list_secrets(MaxResults=100)
        return [secret["Name"] for secret in response["SecretList"]]

    def update_secret(self, secret: BaseSecretSchema) -> None:
        """Update an existing secret.

        Args:
            secret: the secret to update
        """
        self._ensure_client_connected(self.region_name)

        secret_value = jsonify_secret_contents(secret)

        kwargs = {"SecretId": secret.name, "SecretString": secret_value}

        self.CLIENT.put_secret_value(**kwargs)

    def delete_secret(self, secret_name: str) -> None:
        """Delete an existing secret.

        Args:
            secret_name: the name of the secret to delete
        """
        self._ensure_client_connected(self.region_name)
        self.CLIENT.delete_secret(
            SecretId=secret_name, ForceDeleteWithoutRecovery=False
        )

    def delete_all_secrets(self) -> None:
        """Delete all existing secrets.

        This method will force delete all your secrets. You will not be able to
        recover them once this method is called.
        """
        self._ensure_client_connected(self.region_name)
        for secret_name in self.get_all_secret_keys():
            self.CLIENT.delete_secret(
                SecretId=secret_name, ForceDeleteWithoutRecovery=True
            )
delete_all_secrets(self)

Delete all existing secrets.

This method will force delete all your secrets. You will not be able to recover them once this method is called.

Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def delete_all_secrets(self) -> None:
    """Delete all existing secrets.

    This method will force delete all your secrets. You will not be able to
    recover them once this method is called.
    """
    self._ensure_client_connected(self.region_name)
    for secret_name in self.get_all_secret_keys():
        self.CLIENT.delete_secret(
            SecretId=secret_name, ForceDeleteWithoutRecovery=True
        )
delete_secret(self, secret_name)

Delete an existing secret.

Parameters:

Name Type Description Default
secret_name str

the name of the secret to delete

required
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
    """Delete an existing secret.

    Args:
        secret_name: the name of the secret to delete
    """
    self._ensure_client_connected(self.region_name)
    self.CLIENT.delete_secret(
        SecretId=secret_name, ForceDeleteWithoutRecovery=False
    )
get_all_secret_keys(self)

Get all secret keys.

Returns:

Type Description
List[str]

A list of all secret keys

Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
    """Get all secret keys.

    Returns:
        A list of all secret keys
    """
    self._ensure_client_connected(self.region_name)

    # TODO [ENG-720]: Deal with pagination in the aws secret manager when
    #  listing all secrets
    # TODO [ENG-721]: take out this magic maxresults number
    response = self.CLIENT.list_secrets(MaxResults=100)
    return [secret["Name"] for secret in response["SecretList"]]
get_secret(self, secret_name)

Gets a secret.

Parameters:

Name Type Description Default
secret_name str

the name of the secret to get

required

Returns:

Type Description
BaseSecretSchema

The secret.

Exceptions:

Type Description
RuntimeError

if the secret does not exist

Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
    """Gets a secret.

    Args:
        secret_name: the name of the secret to get

    Returns:
        The secret.

    Raises:
        RuntimeError: if the secret does not exist
    """
    self._ensure_client_connected(self.region_name)
    get_secret_value_response = self.CLIENT.get_secret_value(
        SecretId=secret_name
    )
    if "SecretString" not in get_secret_value_response:
        raise RuntimeError(f"No secrets found within the {secret_name}")
    secret_contents: Dict[str, str] = json.loads(
        get_secret_value_response["SecretString"]
    )

    zenml_schema_name = secret_contents.pop(ZENML_SCHEMA_NAME)
    secret_contents["name"] = secret_name

    secret_schema = SecretSchemaClassRegistry.get_class(
        secret_schema=zenml_schema_name
    )
    return secret_schema(**secret_contents)
register_secret(self, secret)

Registers a new secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

the secret to register

required

Exceptions:

Type Description
SecretExistsError

if the secret already exists

Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
    """Registers a new secret.

    Args:
        secret: the secret to register

    Raises:
        SecretExistsError: if the secret already exists
    """
    self._ensure_client_connected(self.region_name)
    secret_value = jsonify_secret_contents(secret)

    if secret.name in self.get_all_secret_keys():
        raise SecretExistsError(
            f"A Secret with the name {secret.name} already exists"
        )

    kwargs = {"Name": secret.name, "SecretString": secret_value}

    self.CLIENT.create_secret(**kwargs)
update_secret(self, secret)

Update an existing secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

the secret to update

required
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
    """Update an existing secret.

    Args:
        secret: the secret to update
    """
    self._ensure_client_connected(self.region_name)

    secret_value = jsonify_secret_contents(secret)

    kwargs = {"SecretId": secret.name, "SecretString": secret_value}

    self.CLIENT.put_secret_value(**kwargs)
jsonify_secret_contents(secret)

Adds the secret type to the secret contents.

This persists the schema type in the secrets backend, so that the correct SecretSchema can be retrieved when the secret is queried from the backend.

Parameters:

Name Type Description Default
secret BaseSecretSchema

should be a subclass of the BaseSecretSchema class

required

Returns:

Type Description
str

jsonified dictionary containing all key-value pairs and the ZenML schema type

Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def jsonify_secret_contents(secret: BaseSecretSchema) -> str:
    """Adds the secret type to the secret contents.

    This persists the schema type in the secrets backend, so that the correct
    SecretSchema can be retrieved when the secret is queried from the backend.

    Args:
        secret: should be a subclass of the BaseSecretSchema class

    Returns:
        jsonified dictionary containing all key-value pairs and the ZenML schema
        type
    """
    secret_contents = secret.content
    secret_contents[ZENML_SCHEMA_NAME] = secret.TYPE
    return json.dumps(secret_contents)

step_operators special

Initialization of the Sagemaker Step Operator.

sagemaker_step_operator

Implementation of the Sagemaker Step Operator.

SagemakerStepOperator (BaseStepOperator) pydantic-model

Step operator to run a step on Sagemaker.

This class defines code that builds an image with the ZenML entrypoint to run using Sagemaker's Estimator.

Attributes:

Name Type Description
role str

The role that has to be assigned to the jobs which are running in Sagemaker.

instance_type str

The type of the compute instance where jobs will run.

base_image Optional[str]

[Optional] The base image to use for building the docker image that will be executed.

bucket Optional[str]

[Optional] Name of the S3 bucket to use for storing artifacts from the job run. If not provided, a default bucket will be created based on the following format: "sagemaker-{region}-{aws-account-id}".

experiment_name Optional[str]

[Optional] The name for the experiment to which the job will be associated. If not provided, the job runs would be independent.

Source code in zenml/integrations/aws/step_operators/sagemaker_step_operator.py
class SagemakerStepOperator(BaseStepOperator):
    """Step operator to run a step on Sagemaker.

    This class defines code that builds an image with the ZenML entrypoint
    to run using Sagemaker's Estimator.

    Attributes:
        role: The role that has to be assigned to the jobs which are
            running in Sagemaker.
        instance_type: The type of the compute instance where jobs will run.
        base_image: [Optional] The base image to use for building the docker
            image that will be executed.
        bucket: [Optional] Name of the S3 bucket to use for storing artifacts
            from the job run. If not provided, a default bucket will be created
            based on the following format: "sagemaker-{region}-{aws-account-id}".
        experiment_name: [Optional] The name for the experiment to which the job
            will be associated. If not provided, the job runs would be
            independent.
    """

    role: str
    instance_type: str

    base_image: Optional[str] = None
    bucket: Optional[str] = None
    experiment_name: Optional[str] = None

    # Class Configuration
    FLAVOR: ClassVar[str] = AWS_SAGEMAKER_STEP_OPERATOR_FLAVOR

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates that the stack contains a container registry.

        Returns:
            A validator that checks that the stack contains a container registry.
        """

        def _ensure_local_orchestrator(stack: Stack) -> Tuple[bool, str]:
            return (
                stack.orchestrator.FLAVOR == "local",
                "Local orchestrator is required",
            )

        return StackValidator(
            required_components={StackComponentType.CONTAINER_REGISTRY},
            custom_validation_function=_ensure_local_orchestrator,
        )

    def _build_docker_image(
        self,
        pipeline_name: str,
        requirements: List[str],
        entrypoint_command: List[str],
    ) -> str:
        repo = Repository()
        container_registry = repo.active_stack.container_registry

        if not container_registry:
            raise RuntimeError("Missing container registry")

        registry_uri = container_registry.uri.rstrip("/")
        image_name = f"{registry_uri}/zenml-sagemaker:{pipeline_name}"

        docker_utils.build_docker_image(
            build_context_path=get_source_root_path(),
            image_name=image_name,
            entrypoint=" ".join(entrypoint_command),
            requirements=set(requirements),
            base_image=self.base_image,
        )
        container_registry.push_image(image_name)
        return docker_utils.get_image_digest(image_name) or image_name

    def launch(
        self,
        pipeline_name: str,
        run_name: str,
        requirements: List[str],
        entrypoint_command: List[str],
    ) -> None:
        """Launches a step on Sagemaker.

        Args:
            pipeline_name: Name of the pipeline which the step to be executed
                is part of.
            run_name: Name of the pipeline run which the step to be executed
                is part of.
            entrypoint_command: Command that executes the step.
            requirements: List of pip requirements that must be installed
                inside the step operator environment.
        """
        image_name = self._build_docker_image(
            pipeline_name=pipeline_name,
            requirements=requirements,
            entrypoint_command=entrypoint_command,
        )

        session = sagemaker.Session(default_bucket=self.bucket)
        estimator = sagemaker.estimator.Estimator(
            image_name,
            self.role,
            instance_count=1,
            instance_type=self.instance_type,
            sagemaker_session=session,
        )

        # Sagemaker doesn't allow any underscores in job/experiment/trial names
        sanitized_run_name = run_name.replace("_", "-")

        experiment_config = {}
        if self.experiment_name:
            experiment_config = {
                "ExperimentName": self.experiment_name,
                "TrialName": sanitized_run_name,
            }

        estimator.fit(
            wait=True,
            experiment_config=experiment_config,
            job_name=sanitized_run_name,
        )
validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates that the stack contains a container registry.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A validator that checks that the stack contains a container registry.

launch(self, pipeline_name, run_name, requirements, entrypoint_command)

Launches a step on Sagemaker.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline which the step to be executed is part of.

required
run_name str

Name of the pipeline run which the step to be executed is part of.

required
entrypoint_command List[str]

Command that executes the step.

required
requirements List[str]

List of pip requirements that must be installed inside the step operator environment.

required
Source code in zenml/integrations/aws/step_operators/sagemaker_step_operator.py
def launch(
    self,
    pipeline_name: str,
    run_name: str,
    requirements: List[str],
    entrypoint_command: List[str],
) -> None:
    """Launches a step on Sagemaker.

    Args:
        pipeline_name: Name of the pipeline which the step to be executed
            is part of.
        run_name: Name of the pipeline run which the step to be executed
            is part of.
        entrypoint_command: Command that executes the step.
        requirements: List of pip requirements that must be installed
            inside the step operator environment.
    """
    image_name = self._build_docker_image(
        pipeline_name=pipeline_name,
        requirements=requirements,
        entrypoint_command=entrypoint_command,
    )

    session = sagemaker.Session(default_bucket=self.bucket)
    estimator = sagemaker.estimator.Estimator(
        image_name,
        self.role,
        instance_count=1,
        instance_type=self.instance_type,
        sagemaker_session=session,
    )

    # Sagemaker doesn't allow any underscores in job/experiment/trial names
    sanitized_run_name = run_name.replace("_", "-")

    experiment_config = {}
    if self.experiment_name:
        experiment_config = {
            "ExperimentName": self.experiment_name,
            "TrialName": sanitized_run_name,
        }

    estimator.fit(
        wait=True,
        experiment_config=experiment_config,
        job_name=sanitized_run_name,
    )

azure special

Initialization of the ZenML Azure integration.

The Azure integration submodule provides a way to run ZenML pipelines in a cloud environment. Specifically, it allows the use of cloud artifact stores, and an io module to handle file operations on Azure Blob Storage. The Azure Step Operator integration submodule provides a way to run ZenML steps in AzureML.

AzureIntegration (Integration)

Definition of Azure integration for ZenML.

Source code in zenml/integrations/azure/__init__.py
class AzureIntegration(Integration):
    """Definition of Azure integration for ZenML."""

    NAME = AZURE
    REQUIREMENTS = [
        "adlfs==2021.10.0",
        "azure-keyvault-keys",
        "azure-keyvault-secrets",
        "azure-identity",
        "azureml-core==1.39.0.post1",
    ]

    @classmethod
    def flavors(cls) -> List[FlavorWrapper]:
        """Declares the flavors for the integration.

        Returns:
            List of stack component flavors for this integration.
        """
        return [
            FlavorWrapper(
                name=AZURE_ARTIFACT_STORE_FLAVOR,
                source="zenml.integrations.azure.artifact_stores"
                ".AzureArtifactStore",
                type=StackComponentType.ARTIFACT_STORE,
                integration=cls.NAME,
            ),
            FlavorWrapper(
                name=AZURE_SECRETS_MANAGER_FLAVOR,
                source="zenml.integrations.azure.secrets_managers"
                ".AzureSecretsManager",
                type=StackComponentType.SECRETS_MANAGER,
                integration=cls.NAME,
            ),
            FlavorWrapper(
                name=AZUREML_STEP_OPERATOR_FLAVOR,
                source="zenml.integrations.azure.step_operators"
                ".AzureMLStepOperator",
                type=StackComponentType.STEP_OPERATOR,
                integration=cls.NAME,
            ),
        ]
flavors() classmethod

Declares the flavors for the integration.

Returns:

Type Description
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper]

List of stack component flavors for this integration.

Source code in zenml/integrations/azure/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
    """Declares the flavors for the integration.

    Returns:
        List of stack component flavors for this integration.
    """
    return [
        FlavorWrapper(
            name=AZURE_ARTIFACT_STORE_FLAVOR,
            source="zenml.integrations.azure.artifact_stores"
            ".AzureArtifactStore",
            type=StackComponentType.ARTIFACT_STORE,
            integration=cls.NAME,
        ),
        FlavorWrapper(
            name=AZURE_SECRETS_MANAGER_FLAVOR,
            source="zenml.integrations.azure.secrets_managers"
            ".AzureSecretsManager",
            type=StackComponentType.SECRETS_MANAGER,
            integration=cls.NAME,
        ),
        FlavorWrapper(
            name=AZUREML_STEP_OPERATOR_FLAVOR,
            source="zenml.integrations.azure.step_operators"
            ".AzureMLStepOperator",
            type=StackComponentType.STEP_OPERATOR,
            integration=cls.NAME,
        ),
    ]

artifact_stores special

Initialization of the Azure Artifact Store integration.

azure_artifact_store

Implementation of the Azure Artifact Store integration.

AzureArtifactStore (BaseArtifactStore, AuthenticationMixin) pydantic-model

Artifact Store for Microsoft Azure based artifacts.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
class AzureArtifactStore(BaseArtifactStore, AuthenticationMixin):
    """Artifact Store for Microsoft Azure based artifacts."""

    _filesystem: Optional[adlfs.AzureBlobFileSystem] = None

    # Class Configuration
    FLAVOR: ClassVar[str] = AZURE_ARTIFACT_STORE_FLAVOR
    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"abfs://", "az://"}

    @property
    def filesystem(self) -> adlfs.AzureBlobFileSystem:
        """The adlfs filesystem to access this artifact store.

        Returns:
            The adlfs filesystem to access this artifact store.
        """
        if not self._filesystem:
            secret = self.get_authentication_secret(
                expected_schema_type=AzureSecretSchema
            )
            credentials = secret.content if secret else {}

            self._filesystem = adlfs.AzureBlobFileSystem(
                **credentials,
                anon=False,
                use_listings_cache=False,
            )
        return self._filesystem

    @classmethod
    def _split_path(cls, path: PathType) -> Tuple[str, str]:
        """Splits a path into the filesystem prefix and remainder.

        Example:
        ```python
        prefix, remainder = ZenAzure._split_path("az://my_container/test.txt")
        print(prefix, remainder)  # "az://" "my_container/test.txt"
        ```

        Args:
            path: The path to split.

        Returns:
            A tuple of the filesystem prefix and the remainder.
        """
        path = convert_to_str(path)
        prefix = ""
        for potential_prefix in cls.SUPPORTED_SCHEMES:
            if path.startswith(potential_prefix):
                prefix = potential_prefix
                path = path[len(potential_prefix) :]
                break

        return prefix, path

    def open(self, path: PathType, mode: str = "r") -> Any:
        """Open a file at the given path.

        Args:
            path: Path of the file to open.
            mode: Mode in which to open the file. Currently, only
                'rb' and 'wb' to read and write binary files are supported.

        Returns:
            A file-like object.
        """
        return self.filesystem.open(path=path, mode=mode)

    def copyfile(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file.

        Args:
            src: The path to copy from.
            dst: The path to copy to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to copy to destination '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to copy anyway."
            )

        # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
        #  manually remove it first
        self.filesystem.copy(path1=src, path2=dst)

    def exists(self, path: PathType) -> bool:
        """Check whether a path exists.

        Args:
            path: The path to check.

        Returns:
            True if the path exists, False otherwise.
        """
        return self.filesystem.exists(path=path)  # type: ignore[no-any-return]

    def glob(self, pattern: PathType) -> List[PathType]:
        """Return all paths that match the given glob pattern.

        The glob pattern may include:
        - '*' to match any number of characters
        - '?' to match a single character
        - '[...]' to match one of the characters inside the brackets
        - '**' as the full name of a path component to match to search
            in subdirectories of any depth (e.g. '/some_dir/**/some_file)

        Args:
            pattern: The glob pattern to match, see details above.

        Returns:
            A list of paths that match the given glob pattern.
        """
        prefix, _ = self._split_path(pattern)
        return [
            f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
        ]

    def isdir(self, path: PathType) -> bool:
        """Check whether a path is a directory.

        Args:
            path: The path to check.

        Returns:
            True if the path is a directory, False otherwise.
        """
        return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]

    def listdir(self, path: PathType) -> List[PathType]:
        """Return a list of files in a directory.

        Args:
            path: The path to list.

        Returns:
            A list of files in the given directory.
        """
        _, path = self._split_path(path)

        def _extract_basename(file_dict: Dict[str, Any]) -> str:
            """Extracts the basename from a dictionary returned by the Azure filesystem.

            Args:
                file_dict: A dictionary returned by the Azure filesystem.

            Returns:
                The basename of the file.
            """
            file_path = cast(str, file_dict["name"])
            base_name = file_path[len(path) :]
            return base_name.lstrip("/")

        return [
            _extract_basename(dict_)
            for dict_ in self.filesystem.listdir(path=path)
        ]

    def makedirs(self, path: PathType) -> None:
        """Create a directory at the given path.

        If needed also create missing parent directories.

        Args:
            path: The path to create.
        """
        self.filesystem.makedirs(path=path, exist_ok=True)

    def mkdir(self, path: PathType) -> None:
        """Create a directory at the given path.

        Args:
            path: The path to create.
        """
        self.filesystem.makedir(path=path)

    def remove(self, path: PathType) -> None:
        """Remove the file at the given path.

        Args:
            path: The path to remove.
        """
        self.filesystem.rm_file(path=path)

    def rename(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Rename source file to destination file.

        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to rename file to '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to rename anyway."
            )

        # TODO [ENG-152]: Check if it works with overwrite=True or if we need
        #  to manually remove it first
        self.filesystem.rename(path1=src, path2=dst)

    def rmtree(self, path: PathType) -> None:
        """Remove the given directory.

        Args:
            path: The path of the directory to remove.
        """
        self.filesystem.delete(path=path, recursive=True)

    def stat(self, path: PathType) -> Dict[str, Any]:
        """Return stat info for the given path.

        Args:
            path: The path to get stat info for.

        Returns:
            Stat info.
        """
        return self.filesystem.stat(path=path)  # type: ignore[no-any-return]

    def walk(
        self,
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.

        Args:
            top: Path of directory to walk.
            topdown: Unused argument to conform to interface.
            onerror: Unused argument to conform to interface.

        Yields:
            An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
        """
        # TODO [ENG-153]: Additional params
        prefix, _ = self._split_path(top)
        for (
            directory,
            subdirectories,
            files,
        ) in self.filesystem.walk(path=top):
            yield f"{prefix}{directory}", subdirectories, files
filesystem: AzureBlobFileSystem property readonly

The adlfs filesystem to access this artifact store.

Returns:

Type Description
AzureBlobFileSystem

The adlfs filesystem to access this artifact store.

copyfile(self, src, dst, overwrite=False)

Copy a file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path to copy from.

required
dst Union[bytes, str]

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )

    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(self, path)

Check whether a path exists.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
glob(self, pattern)

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern Union[bytes, str]

The glob pattern to match, see details above.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths that match the given glob pattern.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
        in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    prefix, _ = self._split_path(pattern)
    return [
        f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
    ]
isdir(self, path)

Check whether a path is a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(self, path)

Return a list of files in a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to list.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of files in the given directory.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path to list.

    Returns:
        A list of files in the given directory.
    """
    _, path = self._split_path(path)

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a dictionary returned by the Azure filesystem.

        Args:
            file_dict: A dictionary returned by the Azure filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
    ]
makedirs(self, path)

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)

Create a directory at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to create.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path to create.
    """
    self.filesystem.makedir(path=path)
open(self, path, mode='r')

Open a file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(self, path)

Remove the file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to remove.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path to remove.
    """
    self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)

Remove the given directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to remove.

required
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
stat(self, path)

Return stat info for the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

Stat info.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: The path to get stat info for.

    Returns:
        Stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., NoneType]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    prefix, _ = self._split_path(top)
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{prefix}{directory}", subdirectories, files

secrets_managers special

Initialization of the Azure Secrets Manager integration.

azure_secrets_manager

Implementation of the Azure Secrets Manager integration.

AzureSecretsManager (BaseSecretsManager) pydantic-model

Class to interact with the Azure secrets manager.

Attributes:

Name Type Description
project_id

This is necessary to access the correct Azure project. The project_id of your Azure project space that contains the Secret Manager.

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
class AzureSecretsManager(BaseSecretsManager):
    """Class to interact with the Azure secrets manager.

    Attributes:
        project_id:  This is necessary to access the correct Azure project.
                     The project_id of your Azure project space that contains
                     the Secret Manager.
    """

    key_vault_name: str

    # Class configuration
    FLAVOR: ClassVar[str] = AZURE_SECRETS_MANAGER_FLAVOR
    CLIENT: ClassVar[Any] = None

    @classmethod
    def _ensure_client_connected(cls, vault_name: str) -> None:
        if cls.CLIENT is None:
            KVUri = f"https://{vault_name}.vault.azure.net"

            credential = DefaultAzureCredential()
            cls.CLIENT = SecretClient(vault_url=KVUri, credential=credential)

    def register_secret(self, secret: BaseSecretSchema) -> None:
        """Registers a new secret.

        Args:
            secret: the secret to register

        Raises:
            SecretExistsError: if the secret already exists
            ValueError: if the secret name contains an underscore.
        """
        self._ensure_client_connected(self.key_vault_name)

        if "_" in secret.name:
            raise ValueError(
                f"The secret name `{secret.name}` contains an underscore. "
                f"This will cause issues with Azure. Please try again."
            )

        if secret.name in self.get_all_secret_keys():
            raise SecretExistsError(
                f"A Secret with the name '{secret.name}' already exists."
            )

        adjusted_content = prepend_group_name_to_keys(secret)

        for k, v in adjusted_content.items():
            # Create the secret, this only creates an empty secret with the
            #  supplied name.
            azure_secret = self.CLIENT.set_secret(k, v)
            self.CLIENT.update_secret_properties(
                azure_secret.name,
                tags={
                    ZENML_GROUP_KEY: secret.name,
                    ZENML_SCHEMA_NAME: secret.TYPE,
                },
            )

            logger.debug("Created created secret: %s", azure_secret.name)
            logger.debug("Added value to secret.")

    def get_secret(self, secret_name: str) -> BaseSecretSchema:
        """Get a secret by its name.

        Args:
            secret_name: the name of the secret to get

        Returns:
            The secret.

        Raises:
            RuntimeError: if the secret does not exist
            ValueError: if the secret is named 'name'
        """
        self._ensure_client_connected(self.key_vault_name)

        secret_contents = {}
        zenml_schema_name = ""

        for secret_property in self.CLIENT.list_properties_of_secrets():
            response = self.CLIENT.get_secret(secret_property.name)
            tags = response.properties.tags
            if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
                secret_key = remove_group_name_from_key(
                    combined_key_name=response.name, group_name=secret_name
                )
                if secret_key == "name":
                    raise ValueError("The secret's key cannot be 'name'.")

                secret_contents[secret_key] = response.value

                zenml_schema_name = tags.get(ZENML_SCHEMA_NAME)

        if not secret_contents:
            raise RuntimeError(f"No secrets found within the {secret_name}")

        secret_contents["name"] = secret_name

        secret_schema = SecretSchemaClassRegistry.get_class(
            secret_schema=zenml_schema_name
        )
        return secret_schema(**secret_contents)

    def get_all_secret_keys(self) -> List[str]:
        """Get all secret keys.

        Returns:
            A list of all secret keys
        """
        self._ensure_client_connected(self.key_vault_name)

        set_of_secrets = set()

        for secret_property in self.CLIENT.list_properties_of_secrets():
            tags = secret_property.tags
            if tags:
                set_of_secrets.add(tags.get(ZENML_GROUP_KEY))

        return list(set_of_secrets)

    def update_secret(self, secret: BaseSecretSchema) -> None:
        """Update an existing secret by creating new versions of the existing secrets.

        Args:
            secret: the secret to update
        """
        self._ensure_client_connected(self.key_vault_name)

        adjusted_content = prepend_group_name_to_keys(secret)

        for k, v in adjusted_content.items():
            self.CLIENT.set_secret(k, v)
            self.CLIENT.update_secret_properties(
                k,
                tags={
                    ZENML_GROUP_KEY: secret.name,
                    ZENML_SCHEMA_NAME: secret.TYPE,
                },
            )

    def delete_secret(self, secret_name: str) -> None:
        """Delete an existing secret. by name.

        In Azure a secret is a single k-v pair. Within ZenML a secret is a
        collection of k-v pairs. As such, deleting a secret will iterate through
        all secrets and delete the ones with the secret_name as label.

        Args:
            secret_name: the name of the secret to delete
        """
        self._ensure_client_connected(self.key_vault_name)

        # Go through all Azure secrets and delete the ones with the secret_name
        #  as label.
        for secret_property in self.CLIENT.list_properties_of_secrets():
            response = self.CLIENT.get_secret(secret_property.name)
            tags = response.properties.tags
            if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
                self.CLIENT.begin_delete_secret(secret_property.name).result()

    def delete_all_secrets(self) -> None:
        """Delete all existing secrets."""
        self._ensure_client_connected(self.key_vault_name)

        # List all secrets.
        for secret_property in self.CLIENT.list_properties_of_secrets():
            response = self.CLIENT.get_secret(secret_property.name)
            tags = response.properties.tags
            if tags and (ZENML_GROUP_KEY in tags or ZENML_SCHEMA_NAME in tags):
                logger.info(
                    "Deleted key-value pair {`%s`, `***`} from secret " "`%s`",
                    secret_property.name,
                    tags.get(ZENML_GROUP_KEY),
                )
                self.CLIENT.begin_delete_secret(secret_property.name).result()
delete_all_secrets(self)

Delete all existing secrets.

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def delete_all_secrets(self) -> None:
    """Delete all existing secrets."""
    self._ensure_client_connected(self.key_vault_name)

    # List all secrets.
    for secret_property in self.CLIENT.list_properties_of_secrets():
        response = self.CLIENT.get_secret(secret_property.name)
        tags = response.properties.tags
        if tags and (ZENML_GROUP_KEY in tags or ZENML_SCHEMA_NAME in tags):
            logger.info(
                "Deleted key-value pair {`%s`, `***`} from secret " "`%s`",
                secret_property.name,
                tags.get(ZENML_GROUP_KEY),
            )
            self.CLIENT.begin_delete_secret(secret_property.name).result()
delete_secret(self, secret_name)

Delete an existing secret. by name.

In Azure a secret is a single k-v pair. Within ZenML a secret is a collection of k-v pairs. As such, deleting a secret will iterate through all secrets and delete the ones with the secret_name as label.

Parameters:

Name Type Description Default
secret_name str

the name of the secret to delete

required
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
    """Delete an existing secret. by name.

    In Azure a secret is a single k-v pair. Within ZenML a secret is a
    collection of k-v pairs. As such, deleting a secret will iterate through
    all secrets and delete the ones with the secret_name as label.

    Args:
        secret_name: the name of the secret to delete
    """
    self._ensure_client_connected(self.key_vault_name)

    # Go through all Azure secrets and delete the ones with the secret_name
    #  as label.
    for secret_property in self.CLIENT.list_properties_of_secrets():
        response = self.CLIENT.get_secret(secret_property.name)
        tags = response.properties.tags
        if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
            self.CLIENT.begin_delete_secret(secret_property.name).result()
get_all_secret_keys(self)

Get all secret keys.

Returns:

Type Description
List[str]

A list of all secret keys

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
    """Get all secret keys.

    Returns:
        A list of all secret keys
    """
    self._ensure_client_connected(self.key_vault_name)

    set_of_secrets = set()

    for secret_property in self.CLIENT.list_properties_of_secrets():
        tags = secret_property.tags
        if tags:
            set_of_secrets.add(tags.get(ZENML_GROUP_KEY))

    return list(set_of_secrets)
get_secret(self, secret_name)

Get a secret by its name.

Parameters:

Name Type Description Default
secret_name str

the name of the secret to get

required

Returns:

Type Description
BaseSecretSchema

The secret.

Exceptions:

Type Description
RuntimeError

if the secret does not exist

ValueError

if the secret is named 'name'

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
    """Get a secret by its name.

    Args:
        secret_name: the name of the secret to get

    Returns:
        The secret.

    Raises:
        RuntimeError: if the secret does not exist
        ValueError: if the secret is named 'name'
    """
    self._ensure_client_connected(self.key_vault_name)

    secret_contents = {}
    zenml_schema_name = ""

    for secret_property in self.CLIENT.list_properties_of_secrets():
        response = self.CLIENT.get_secret(secret_property.name)
        tags = response.properties.tags
        if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
            secret_key = remove_group_name_from_key(
                combined_key_name=response.name, group_name=secret_name
            )
            if secret_key == "name":
                raise ValueError("The secret's key cannot be 'name'.")

            secret_contents[secret_key] = response.value

            zenml_schema_name = tags.get(ZENML_SCHEMA_NAME)

    if not secret_contents:
        raise RuntimeError(f"No secrets found within the {secret_name}")

    secret_contents["name"] = secret_name

    secret_schema = SecretSchemaClassRegistry.get_class(
        secret_schema=zenml_schema_name
    )
    return secret_schema(**secret_contents)
register_secret(self, secret)

Registers a new secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

the secret to register

required

Exceptions:

Type Description
SecretExistsError

if the secret already exists

ValueError

if the secret name contains an underscore.

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
    """Registers a new secret.

    Args:
        secret: the secret to register

    Raises:
        SecretExistsError: if the secret already exists
        ValueError: if the secret name contains an underscore.
    """
    self._ensure_client_connected(self.key_vault_name)

    if "_" in secret.name:
        raise ValueError(
            f"The secret name `{secret.name}` contains an underscore. "
            f"This will cause issues with Azure. Please try again."
        )

    if secret.name in self.get_all_secret_keys():
        raise SecretExistsError(
            f"A Secret with the name '{secret.name}' already exists."
        )

    adjusted_content = prepend_group_name_to_keys(secret)

    for k, v in adjusted_content.items():
        # Create the secret, this only creates an empty secret with the
        #  supplied name.
        azure_secret = self.CLIENT.set_secret(k, v)
        self.CLIENT.update_secret_properties(
            azure_secret.name,
            tags={
                ZENML_GROUP_KEY: secret.name,
                ZENML_SCHEMA_NAME: secret.TYPE,
            },
        )

        logger.debug("Created created secret: %s", azure_secret.name)
        logger.debug("Added value to secret.")
update_secret(self, secret)

Update an existing secret by creating new versions of the existing secrets.

Parameters:

Name Type Description Default
secret BaseSecretSchema

the secret to update

required
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
    """Update an existing secret by creating new versions of the existing secrets.

    Args:
        secret: the secret to update
    """
    self._ensure_client_connected(self.key_vault_name)

    adjusted_content = prepend_group_name_to_keys(secret)

    for k, v in adjusted_content.items():
        self.CLIENT.set_secret(k, v)
        self.CLIENT.update_secret_properties(
            k,
            tags={
                ZENML_GROUP_KEY: secret.name,
                ZENML_SCHEMA_NAME: secret.TYPE,
            },
        )
prepend_group_name_to_keys(secret)

Adds the secret group name to the keys of each secret key-value pair.

This allows using the same key across multiple secrets.

Parameters:

Name Type Description Default
secret BaseSecretSchema

The ZenML Secret schema

required

Returns:

Type Description
Dict[str, str]

A dictionary with the secret keys prepended with the group name

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def prepend_group_name_to_keys(secret: BaseSecretSchema) -> Dict[str, str]:
    """Adds the secret group name to the keys of each secret key-value pair.

    This allows using the same key across multiple
    secrets.

    Args:
        secret: The ZenML Secret schema

    Returns:
        A dictionary with the secret keys prepended with the group name
    """
    return {f"{secret.name}-{k}": v for k, v in secret.content.items()}
remove_group_name_from_key(combined_key_name, group_name)

Removes the secret group name from the secret key.

Parameters:

Name Type Description Default
combined_key_name str

Full name as it is within the Azure secrets manager

required
group_name str

Group name (the ZenML Secret name)

required

Returns:

Type Description
str

The cleaned key

Exceptions:

Type Description
RuntimeError

If the group name is not found in the key name

Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def remove_group_name_from_key(combined_key_name: str, group_name: str) -> str:
    """Removes the secret group name from the secret key.

    Args:
        combined_key_name: Full name as it is within the Azure secrets manager
        group_name: Group name (the ZenML Secret name)

    Returns:
        The cleaned key

    Raises:
        RuntimeError: If the group name is not found in the key name
    """
    if combined_key_name.startswith(f"{group_name}-"):
        return combined_key_name[len(f"{group_name}-") :]
    else:
        raise RuntimeError(
            f"Key-name `{combined_key_name}` does not have the "
            f"prefix `{group_name}`. Key could not be "
            f"extracted."
        )

step_operators special

Initialization of AzureML Step Operator integration.

azureml_step_operator

Implementation of the ZenML AzureML Step Operator.

AzureMLStepOperator (BaseStepOperator) pydantic-model

Step operator to run a step on AzureML.

This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.

Attributes:

Name Type Description
subscription_id str

The Azure account's subscription ID

resource_group str

The resource group to which the AzureML workspace is deployed.

workspace_name str

The name of the AzureML Workspace.

compute_target_name str

The name of the configured ComputeTarget. An instance of it has to be created on the portal if it doesn't exist already.

environment_name Optional[str]

[Optional] The name of the environment if there already exists one.

docker_base_image Optional[str]

[Optional] The custom docker base image that the environment should use.

tenant_id Optional[str]

The Azure Tenant ID.

service_principal_id Optional[str]

The ID for the service principal that is created to allow apps to access secure resources.

service_principal_password Optional[str]

Password for the service principal.

Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
class AzureMLStepOperator(BaseStepOperator):
    """Step operator to run a step on AzureML.

    This class defines code that can set up an AzureML environment and run the
    ZenML entrypoint command in it.

    Attributes:
        subscription_id: The Azure account's subscription ID
        resource_group: The resource group to which the AzureML workspace
            is deployed.
        workspace_name: The name of the AzureML Workspace.
        compute_target_name: The name of the configured ComputeTarget.
            An instance of it has to be created on the portal if it doesn't
            exist already.
        environment_name: [Optional] The name of the environment if there
            already exists one.
        docker_base_image: [Optional] The custom docker base image that the
            environment should use.
        tenant_id: The Azure Tenant ID.
        service_principal_id: The ID for the service principal that is created
            to allow apps to access secure resources.
        service_principal_password: Password for the service principal.
    """

    subscription_id: str
    resource_group: str
    workspace_name: str
    compute_target_name: str

    # Environment
    environment_name: Optional[str] = None
    docker_base_image: Optional[str] = None

    # Service principal authentication
    # https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication#configure-a-service-principal
    tenant_id: Optional[str] = None
    service_principal_id: Optional[str] = None
    service_principal_password: Optional[str] = None

    # Class Configuration
    FLAVOR: ClassVar[str] = AZUREML_STEP_OPERATOR_FLAVOR

    def _get_authentication(self) -> Optional[AbstractAuthentication]:
        """Returns the authentication object for the AzureML environment.

        Returns:
            The authentication object for the AzureML environment.
        """
        if (
            self.tenant_id
            and self.service_principal_id
            and self.service_principal_password
        ):
            return ServicePrincipalAuthentication(
                tenant_id=self.tenant_id,
                service_principal_id=self.service_principal_id,
                service_principal_password=self.service_principal_password,
            )
        return None

    def _prepare_environment(
        self, workspace: Workspace, requirements: List[str], run_name: str
    ) -> Environment:
        """Prepares the environment in which Azure will run all jobs.

        Args:
            workspace: The AzureML Workspace that has configuration
                for a storage account, container registry among other
                things.
            requirements: The list of requirements to be installed
                in the environment.
            run_name: The name of the pipeline run that can be used
                for naming environments and runs.

        Returns:
            The AzureML Environment object.
        """
        if self.environment_name:
            environment = Environment.get(
                workspace=workspace, name=self.environment_name
            )
            if not environment.python.conda_dependencies:
                environment.python.conda_dependencies = (
                    CondaDependencies.create(
                        python_version=ZenMLEnvironment.python_version()
                    )
                )

            for requirement in requirements:
                environment.python.conda_dependencies.add_pip_package(
                    requirement
                )
        else:
            environment = Environment(name=f"zenml-{run_name}")
            environment.python.conda_dependencies = CondaDependencies.create(
                pip_packages=requirements,
                python_version=ZenMLEnvironment.python_version(),
            )

            if self.docker_base_image:
                # replace the default azure base image
                environment.docker.base_image = self.docker_base_image

        environment_variables = {
            "ENV_ZENML_PREVENT_PIPELINE_EXECUTION": "True",
        }
        # set credentials to access azure storage
        for key in [
            "AZURE_STORAGE_ACCOUNT_KEY",
            "AZURE_STORAGE_ACCOUNT_NAME",
            "AZURE_STORAGE_CONNECTION_STRING",
            "AZURE_STORAGE_SAS_TOKEN",
        ]:
            value = os.getenv(key)
            if value:
                environment_variables[key] = value

        environment_variables[
            ENV_ZENML_CONFIG_PATH
        ] = f"./{CONTAINER_ZENML_CONFIG_DIR}"

        environment.environment_variables = environment_variables
        return environment

    def launch(
        self,
        pipeline_name: str,
        run_name: str,
        requirements: List[str],
        entrypoint_command: List[str],
    ) -> None:
        """Launches a step on AzureML.

        Args:
            pipeline_name: Name of the pipeline which the step to be executed
                is part of.
            run_name: Name of the pipeline run which the step to be executed
                is part of.
            entrypoint_command: Command that executes the step.
            requirements: List of pip requirements that must be installed
                inside the step operator environment.
        """
        workspace = Workspace.get(
            subscription_id=self.subscription_id,
            resource_group=self.resource_group,
            name=self.workspace_name,
            auth=self._get_authentication(),
        )

        source_directory = get_source_root_path()
        config_path = os.path.join(source_directory, CONTAINER_ZENML_CONFIG_DIR)
        try:

            # Save a copy of the current global configuration with the
            # active profile contents into the build context, to have
            # the configured stacks accessible from within the Azure ML
            # environment.
            GlobalConfiguration().copy_active_configuration(
                config_path,
                load_config_path=f"./{CONTAINER_ZENML_CONFIG_DIR}",
            )

            environment = self._prepare_environment(
                workspace=workspace,
                requirements=requirements,
                run_name=run_name,
            )
            compute_target = ComputeTarget(
                workspace=workspace, name=self.compute_target_name
            )

            run_config = ScriptRunConfig(
                source_directory=source_directory,
                environment=environment,
                compute_target=compute_target,
                command=entrypoint_command,
            )

            experiment = Experiment(workspace=workspace, name=pipeline_name)
            run = experiment.submit(config=run_config)

        finally:
            # Clean up the temporary build files
            fileio.rmtree(config_path)

        run.display_name = run_name
        run.wait_for_completion(show_output=True)
launch(self, pipeline_name, run_name, requirements, entrypoint_command)

Launches a step on AzureML.

Parameters:

Name Type Description Default
pipeline_name str

Name of the pipeline which the step to be executed is part of.

required
run_name str

Name of the pipeline run which the step to be executed is part of.

required
entrypoint_command List[str]

Command that executes the step.

required
requirements List[str]

List of pip requirements that must be installed inside the step operator environment.

required
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def launch(
    self,
    pipeline_name: str,
    run_name: str,
    requirements: List[str],
    entrypoint_command: List[str],
) -> None:
    """Launches a step on AzureML.

    Args:
        pipeline_name: Name of the pipeline which the step to be executed
            is part of.
        run_name: Name of the pipeline run which the step to be executed
            is part of.
        entrypoint_command: Command that executes the step.
        requirements: List of pip requirements that must be installed
            inside the step operator environment.
    """
    workspace = Workspace.get(
        subscription_id=self.subscription_id,
        resource_group=self.resource_group,
        name=self.workspace_name,
        auth=self._get_authentication(),
    )

    source_directory = get_source_root_path()
    config_path = os.path.join(source_directory, CONTAINER_ZENML_CONFIG_DIR)
    try:

        # Save a copy of the current global configuration with the
        # active profile contents into the build context, to have
        # the configured stacks accessible from within the Azure ML
        # environment.
        GlobalConfiguration().copy_active_configuration(
            config_path,
            load_config_path=f"./{CONTAINER_ZENML_CONFIG_DIR}",
        )

        environment = self._prepare_environment(
            workspace=workspace,
            requirements=requirements,
            run_name=run_name,
        )
        compute_target = ComputeTarget(
            workspace=workspace, name=self.compute_target_name
        )

        run_config = ScriptRunConfig(
            source_directory=source_directory,
            environment=environment,
            compute_target=compute_target,
            command=entrypoint_command,
        )

        experiment = Experiment(workspace=workspace, name=pipeline_name)
        run = experiment.submit(config=run_config)

    finally:
        # Clean up the temporary build files
        fileio.rmtree(config_path)

    run.display_name = run_name
    run.wait_for_completion(show_output=True)

constants

Constants for ZenML integrations.

dash special

Initialization of the Dash integration.

DashIntegration (Integration)

Definition of Dash integration for ZenML.

Source code in zenml/integrations/dash/__init__.py
class DashIntegration(Integration):
    """Definition of Dash integration for ZenML."""

    NAME = DASH
    REQUIREMENTS = [
        "dash>=2.0.0",
        "dash-cytoscape>=0.3.0",
        "dash-bootstrap-components>=1.0.1",
        "jupyter-dash>=0.4.2",
    ]

visualizers special

Initialization of the Pipeline Run Visualizer.

pipeline_run_lineage_visualizer

Implementation of the pipeline run lineage visualizer.

PipelineRunLineageVisualizer (BasePipelineRunVisualizer)

Implementation of a lineage diagram via the dash and dash-cytoscape libraries.

Source code in zenml/integrations/dash/visualizers/pipeline_run_lineage_visualizer.py
class PipelineRunLineageVisualizer(BasePipelineRunVisualizer):
    """Implementation of a lineage diagram via the dash and dash-cytoscape libraries."""

    ARTIFACT_PREFIX = "artifact_"
    STEP_PREFIX = "step_"
    STATUS_CLASS_MAPPING = {
        ExecutionStatus.CACHED: "green",
        ExecutionStatus.FAILED: "red",
        ExecutionStatus.RUNNING: "yellow",
        ExecutionStatus.COMPLETED: "blue",
    }

    def visualize(
        self,
        object: PipelineRunView,
        magic: bool = False,
        *args: Any,
        **kwargs: Any,
    ) -> dash.Dash:
        """Method to visualize pipeline runs via the Dash library.

        The layout puts every layer of the dag in a column.

        Args:
            object: The pipeline run to visualize.
            magic: If True, the visualization is rendered in a magic mode.
            *args: Additional positional arguments.
            **kwargs: Additional keyword arguments.

        Returns:
            The Dash application.
        """
        external_stylesheets = [
            dbc.themes.BOOTSTRAP,
            dbc.icons.BOOTSTRAP,
        ]
        if magic:
            if Environment.in_notebook:
                # Only import jupyter_dash in this case
                from jupyter_dash import JupyterDash  # noqa

                JupyterDash.infer_jupyter_proxy_config()

                app = JupyterDash(
                    __name__,
                    external_stylesheets=external_stylesheets,
                )
                mode = "inline"
            else:
                cli_utils.warning(
                    "Cannot set magic flag in non-notebook environments."
                )
        else:
            app = dash.Dash(
                __name__,
                external_stylesheets=external_stylesheets,
            )
            mode = None
        nodes, edges, first_step_id = [], [], None
        first_step_id = None
        for step in object.steps:
            step_output_artifacts = list(step.outputs.values())
            execution_id = (
                step_output_artifacts[0].producer_step.id
                if step_output_artifacts
                else step.id
            )
            step_id = self.STEP_PREFIX + str(step.id)
            if first_step_id is None:
                first_step_id = step_id
            nodes.append(
                {
                    "data": {
                        "id": step_id,
                        "execution_id": execution_id,
                        "label": f"{execution_id} / {step.entrypoint_name}",
                        "entrypoint_name": step.entrypoint_name,  # redundant for consistency
                        "name": step.name,  # redundant for consistency
                        "type": "step",
                        "parameters": step.parameters,
                        "inputs": {k: v.uri for k, v in step.inputs.items()},
                        "outputs": {k: v.uri for k, v in step.outputs.items()},
                    },
                    "classes": self.STATUS_CLASS_MAPPING[step.status],
                }
            )

            for artifact_name, artifact in step.outputs.items():
                nodes.append(
                    {
                        "data": {
                            "id": self.ARTIFACT_PREFIX + str(artifact.id),
                            "execution_id": artifact.id,
                            "label": f"{artifact.id} / {artifact_name} ("
                            f"{artifact.data_type})",
                            "type": "artifact",
                            "name": artifact_name,
                            "is_cached": artifact.is_cached,
                            "artifact_type": artifact.type,
                            "artifact_data_type": artifact.data_type,
                            "parent_step_id": artifact.parent_step_id,
                            "producer_step_id": artifact.producer_step.id,
                            "uri": artifact.uri,
                        },
                        "classes": f"rectangle "
                        f"{self.STATUS_CLASS_MAPPING[step.status]}",
                    }
                )
                edges.append(
                    {
                        "data": {
                            "source": self.STEP_PREFIX + str(step.id),
                            "target": self.ARTIFACT_PREFIX + str(artifact.id),
                        },
                        "classes": f"edge-arrow "
                        f"{self.STATUS_CLASS_MAPPING[step.status]}"
                        + (" dashed" if artifact.is_cached else " solid"),
                    }
                )

            for artifact_name, artifact in step.inputs.items():
                edges.append(
                    {
                        "data": {
                            "source": self.ARTIFACT_PREFIX + str(artifact.id),
                            "target": self.STEP_PREFIX + str(step.id),
                        },
                        "classes": "edge-arrow "
                        + (
                            f"{self.STATUS_CLASS_MAPPING[ExecutionStatus.CACHED]} dashed"
                            if artifact.is_cached
                            else f"{self.STATUS_CLASS_MAPPING[step.status]} solid"
                        ),
                    }
                )

        app.layout = dbc.Row(
            [
                dbc.Container(f"Run: {object.name}", class_name="h1"),
                dbc.Row(
                    [
                        dbc.Col(
                            [
                                dbc.Row(
                                    [
                                        html.Span(
                                            [
                                                html.Span(
                                                    [
                                                        html.I(
                                                            className="bi bi-circle-fill me-1"
                                                        ),
                                                        "Step",
                                                    ],
                                                    className="me-2",
                                                ),
                                                html.Span(
                                                    [
                                                        html.I(
                                                            className="bi bi-square-fill me-1"
                                                        ),
                                                        "Artifact",
                                                    ],
                                                    className="me-4",
                                                ),
                                                dbc.Badge(
                                                    "Completed",
                                                    color=COLOR_BLUE,
                                                    className="me-1",
                                                ),
                                                dbc.Badge(
                                                    "Cached",
                                                    color=COLOR_GREEN,
                                                    className="me-1",
                                                ),
                                                dbc.Badge(
                                                    "Running",
                                                    color=COLOR_YELLOW,
                                                    className="me-1",
                                                ),
                                                dbc.Badge(
                                                    "Failed",
                                                    color=COLOR_RED,
                                                    className="me-1",
                                                ),
                                            ]
                                        ),
                                    ]
                                ),
                                dbc.Row(
                                    [
                                        cyto.Cytoscape(
                                            id="cytoscape",
                                            layout={
                                                "name": "breadthfirst",
                                                "roots": f'[id = "{first_step_id}"]',
                                            },
                                            elements=edges + nodes,
                                            stylesheet=STYLESHEET,
                                            style={
                                                "width": "100%",
                                                "height": "800px",
                                            },
                                            zoom=1,
                                        )
                                    ]
                                ),
                                dbc.Row(
                                    [
                                        dbc.Button(
                                            "Reset",
                                            id="bt-reset",
                                            color="primary",
                                            className="me-1",
                                        )
                                    ]
                                ),
                            ]
                        ),
                        dbc.Col(
                            [
                                dcc.Markdown(id="markdown-selected-node-data"),
                            ]
                        ),
                    ]
                ),
            ],
            className="p-5",
        )

        @app.callback(  # type: ignore[misc]
            Output("markdown-selected-node-data", "children"),
            Input("cytoscape", "selectedNodeData"),
        )
        def display_data(data_list: List[Dict[str, Any]]) -> str:
            """Callback for the text area below the graph.

            Args:
                data_list: The selected node data.

            Returns:
                str: The selected node data.
            """
            if data_list is None:
                return "Click on a node in the diagram."

            text = ""
            for data in data_list:
                text += f'## {data["execution_id"]} / {data["name"]}' + "\n\n"
                if data["type"] == "artifact":
                    for item in [
                        "artifact_data_type",
                        "is_cached",
                        "producer_step_id",
                        "parent_step_id",
                        "uri",
                    ]:
                        text += f"**{item}**: {data[item]}" + "\n\n"
                elif data["type"] == "step":
                    text += "### Inputs:" + "\n\n"
                    for k, v in data["inputs"].items():
                        text += f"**{k}**: {v}" + "\n\n"
                    text += "### Outputs:" + "\n\n"
                    for k, v in data["outputs"].items():
                        text += f"**{k}**: {v}" + "\n\n"
                    text += "### Params:"
                    for k, v in data["parameters"].items():
                        text += f"**{k}**: {v}" + "\n\n"
            return text

        @app.callback(  # type: ignore[misc]
            [Output("cytoscape", "zoom"), Output("cytoscape", "elements")],
            [Input("bt-reset", "n_clicks")],
        )
        def reset_layout(
            n_clicks: int,
        ) -> List[Union[int, List[Dict[str, Collection[str]]]]]:
            """Resets the layout.

            Args:
                n_clicks: The number of clicks on the reset button.

            Returns:
                The zoom and the elements.
            """
            logger.debug(n_clicks, "clicked in reset button.")
            return [1, edges + nodes]

        if mode is not None:
            app.run_server(mode=mode)
        app.run_server()
        return app
visualize(self, object, magic=False, *args, **kwargs)

Method to visualize pipeline runs via the Dash library.

The layout puts every layer of the dag in a column.

Parameters:

Name Type Description Default
object PipelineRunView

The pipeline run to visualize.

required
magic bool

If True, the visualization is rendered in a magic mode.

False
*args Any

Additional positional arguments.

()
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
Dash

The Dash application.

Source code in zenml/integrations/dash/visualizers/pipeline_run_lineage_visualizer.py
def visualize(
    self,
    object: PipelineRunView,
    magic: bool = False,
    *args: Any,
    **kwargs: Any,
) -> dash.Dash:
    """Method to visualize pipeline runs via the Dash library.

    The layout puts every layer of the dag in a column.

    Args:
        object: The pipeline run to visualize.
        magic: If True, the visualization is rendered in a magic mode.
        *args: Additional positional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        The Dash application.
    """
    external_stylesheets = [
        dbc.themes.BOOTSTRAP,
        dbc.icons.BOOTSTRAP,
    ]
    if magic:
        if Environment.in_notebook:
            # Only import jupyter_dash in this case
            from jupyter_dash import JupyterDash  # noqa

            JupyterDash.infer_jupyter_proxy_config()

            app = JupyterDash(
                __name__,
                external_stylesheets=external_stylesheets,
            )
            mode = "inline"
        else:
            cli_utils.warning(
                "Cannot set magic flag in non-notebook environments."
            )
    else:
        app = dash.Dash(
            __name__,
            external_stylesheets=external_stylesheets,
        )
        mode = None
    nodes, edges, first_step_id = [], [], None
    first_step_id = None
    for step in object.steps:
        step_output_artifacts = list(step.outputs.values())
        execution_id = (
            step_output_artifacts[0].producer_step.id
            if step_output_artifacts
            else step.id
        )
        step_id = self.STEP_PREFIX + str(step.id)
        if first_step_id is None:
            first_step_id = step_id
        nodes.append(
            {
                "data": {
                    "id": step_id,
                    "execution_id": execution_id,
                    "label": f"{execution_id} / {step.entrypoint_name}",
                    "entrypoint_name": step.entrypoint_name,  # redundant for consistency
                    "name": step.name,  # redundant for consistency
                    "type": "step",
                    "parameters": step.parameters,
                    "inputs": {k: v.uri for k, v in step.inputs.items()},
                    "outputs": {k: v.uri for k, v in step.outputs.items()},
                },
                "classes": self.STATUS_CLASS_MAPPING[step.status],
            }
        )

        for artifact_name, artifact in step.outputs.items():
            nodes.append(
                {
                    "data": {
                        "id": self.ARTIFACT_PREFIX + str(artifact.id),
                        "execution_id": artifact.id,
                        "label": f"{artifact.id} / {artifact_name} ("
                        f"{artifact.data_type})",
                        "type": "artifact",
                        "name": artifact_name,
                        "is_cached": artifact.is_cached,
                        "artifact_type": artifact.type,
                        "artifact_data_type": artifact.data_type,
                        "parent_step_id": artifact.parent_step_id,
                        "producer_step_id": artifact.producer_step.id,
                        "uri": artifact.uri,
                    },
                    "classes": f"rectangle "
                    f"{self.STATUS_CLASS_MAPPING[step.status]}",
                }
            )
            edges.append(
                {
                    "data": {
                        "source": self.STEP_PREFIX + str(step.id),
                        "target": self.ARTIFACT_PREFIX + str(artifact.id),
                    },
                    "classes": f"edge-arrow "
                    f"{self.STATUS_CLASS_MAPPING[step.status]}"
                    + (" dashed" if artifact.is_cached else " solid"),
                }
            )

        for artifact_name, artifact in step.inputs.items():
            edges.append(
                {
                    "data": {
                        "source": self.ARTIFACT_PREFIX + str(artifact.id),
                        "target": self.STEP_PREFIX + str(step.id),
                    },
                    "classes": "edge-arrow "
                    + (
                        f"{self.STATUS_CLASS_MAPPING[ExecutionStatus.CACHED]} dashed"
                        if artifact.is_cached
                        else f"{self.STATUS_CLASS_MAPPING[step.status]} solid"
                    ),
                }
            )

    app.layout = dbc.Row(
        [
            dbc.Container(f"Run: {object.name}", class_name="h1"),
            dbc.Row(
                [
                    dbc.Col(
                        [
                            dbc.Row(
                                [
                                    html.Span(
                                        [
                                            html.Span(
                                                [
                                                    html.I(
                                                        className="bi bi-circle-fill me-1"
                                                    ),
                                                    "Step",
                                                ],
                                                className="me-2",
                                            ),
                                            html.Span(
                                                [
                                                    html.I(
                                                        className="bi bi-square-fill me-1"
                                                    ),
                                                    "Artifact",
                                                ],
                                                className="me-4",
                                            ),
                                            dbc.Badge(
                                                "Completed",
                                                color=COLOR_BLUE,
                                                className="me-1",
                                            ),
                                            dbc.Badge(
                                                "Cached",
                                                color=COLOR_GREEN,
                                                className="me-1",
                                            ),
                                            dbc.Badge(
                                                "Running",
                                                color=COLOR_YELLOW,
                                                className="me-1",
                                            ),
                                            dbc.Badge(
                                                "Failed",
                                                color=COLOR_RED,
                                                className="me-1",
                                            ),
                                        ]
                                    ),
                                ]
                            ),
                            dbc.Row(
                                [
                                    cyto.Cytoscape(
                                        id="cytoscape",
                                        layout={
                                            "name": "breadthfirst",
                                            "roots": f'[id = "{first_step_id}"]',
                                        },
                                        elements=edges + nodes,
                                        stylesheet=STYLESHEET,
                                        style={
                                            "width": "100%",
                                            "height": "800px",
                                        },
                                        zoom=1,
                                    )
                                ]
                            ),
                            dbc.Row(
                                [
                                    dbc.Button(
                                        "Reset",
                                        id="bt-reset",
                                        color="primary",
                                        className="me-1",
                                    )
                                ]
                            ),
                        ]
                    ),
                    dbc.Col(
                        [
                            dcc.Markdown(id="markdown-selected-node-data"),
                        ]
                    ),
                ]
            ),
        ],
        className="p-5",
    )

    @app.callback(  # type: ignore[misc]
        Output("markdown-selected-node-data", "children"),
        Input("cytoscape", "selectedNodeData"),
    )
    def display_data(data_list: List[Dict[str, Any]]) -> str:
        """Callback for the text area below the graph.

        Args:
            data_list: The selected node data.

        Returns:
            str: The selected node data.
        """
        if data_list is None:
            return "Click on a node in the diagram."

        text = ""
        for data in data_list:
            text += f'## {data["execution_id"]} / {data["name"]}' + "\n\n"
            if data["type"] == "artifact":
                for item in [
                    "artifact_data_type",
                    "is_cached",
                    "producer_step_id",
                    "parent_step_id",
                    "uri",
                ]:
                    text += f"**{item}**: {data[item]}" + "\n\n"
            elif data["type"] == "step":
                text += "### Inputs:" + "\n\n"
                for k, v in data["inputs"].items():
                    text += f"**{k}**: {v}" + "\n\n"
                text += "### Outputs:" + "\n\n"
                for k, v in data["outputs"].items():
                    text += f"**{k}**: {v}" + "\n\n"
                text += "### Params:"
                for k, v in data["parameters"].items():
                    text += f"**{k}**: {v}" + "\n\n"
        return text

    @app.callback(  # type: ignore[misc]
        [Output("cytoscape", "zoom"), Output("cytoscape", "elements")],
        [Input("bt-reset", "n_clicks")],
    )
    def reset_layout(
        n_clicks: int,
    ) -> List[Union[int, List[Dict[str, Collection[str]]]]]:
        """Resets the layout.

        Args:
            n_clicks: The number of clicks on the reset button.

        Returns:
            The zoom and the elements.
        """
        logger.debug(n_clicks, "clicked in reset button.")
        return [1, edges + nodes]

    if mode is not None:
        app.run_server(mode=mode)
    app.run_server()
    return app

evidently special

Initialization of the Evidently integration.

The Evidently integration provides a way to monitor your models in production. It includes a way to detect data drift and different kinds of model performance issues.

The results of Evidently calculations can either be exported as an interactive dashboard (visualized as an html file or in your Jupyter notebook), or as a JSON file.

EvidentlyIntegration (Integration)

Evidently integration for ZenML.

Source code in zenml/integrations/evidently/__init__.py
class EvidentlyIntegration(Integration):
    """[Evidently](https://github.com/evidentlyai/evidently) integration for ZenML."""

    NAME = EVIDENTLY
    REQUIREMENTS = ["evidently==v0.1.41.dev0"]

steps special

Initialization of the Evidently Standard Steps.

evidently_profile

Implementation of the Evidently Profile Step.

EvidentlyProfileConfig (BaseDriftDetectionConfig) pydantic-model

Config class for Evidently profile steps.

column_mapping: properties of the DataFrame's columns used !!! profile_section "a string that identifies the profile section to be used." The following are valid options supported by Evidently: - "datadrift" - "categoricaltargetdrift" - "numericaltargetdrift" - "classificationmodelperformance" - "regressionmodelperformance" - "probabilisticmodelperformance"

Source code in zenml/integrations/evidently/steps/evidently_profile.py
class EvidentlyProfileConfig(BaseDriftDetectionConfig):
    """Config class for Evidently profile steps.

    column_mapping: properties of the DataFrame's columns used
    profile_section: a string that identifies the profile section to be used.
        The following are valid options supported by Evidently:
        - "datadrift"
        - "categoricaltargetdrift"
        - "numericaltargetdrift"
        - "classificationmodelperformance"
        - "regressionmodelperformance"
        - "probabilisticmodelperformance"
    """

    def get_profile_sections_and_tabs(
        self,
    ) -> Tuple[List[ProfileSection], List[Tab]]:
        """Get the profile sections and tabs to be used in the dashboard.

        Returns:
            A tuple of two lists of profile sections and tabs.

        Raises:
            ValueError: if the profile_section is not supported.
        """
        try:
            return (
                [
                    profile_mapper[profile]()
                    for profile in self.profile_sections
                ],
                [
                    dashboard_mapper[profile]()
                    for profile in self.profile_sections
                ],
            )
        except KeyError:
            nl = "\n"
            raise ValueError(
                f"Invalid profile section: {self.profile_sections} \n\n"
                f"Valid and supported options are: {nl}- "
                f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
            )

    column_mapping: Optional[ColumnMapping]
    profile_sections: Sequence[str]
get_profile_sections_and_tabs(self)

Get the profile sections and tabs to be used in the dashboard.

Returns:

Type Description
Tuple[List[evidently.model_profile.sections.base_profile_section.ProfileSection], List[evidently.dashboard.tabs.base_tab.Tab]]

A tuple of two lists of profile sections and tabs.

Exceptions:

Type Description
ValueError

if the profile_section is not supported.

Source code in zenml/integrations/evidently/steps/evidently_profile.py
def get_profile_sections_and_tabs(
    self,
) -> Tuple[List[ProfileSection], List[Tab]]:
    """Get the profile sections and tabs to be used in the dashboard.

    Returns:
        A tuple of two lists of profile sections and tabs.

    Raises:
        ValueError: if the profile_section is not supported.
    """
    try:
        return (
            [
                profile_mapper[profile]()
                for profile in self.profile_sections
            ],
            [
                dashboard_mapper[profile]()
                for profile in self.profile_sections
            ],
        )
    except KeyError:
        nl = "\n"
        raise ValueError(
            f"Invalid profile section: {self.profile_sections} \n\n"
            f"Valid and supported options are: {nl}- "
            f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
        )
EvidentlyProfileStep (BaseDriftDetectionStep)

Step implementation implementing an Evidently Profile Step.

Source code in zenml/integrations/evidently/steps/evidently_profile.py
class EvidentlyProfileStep(BaseDriftDetectionStep):
    """Step implementation implementing an Evidently Profile Step."""

    OUTPUT_SPEC = {
        "profile": DataAnalysisArtifact,
        "dashboard": DataAnalysisArtifact,
    }

    def entrypoint(  # type: ignore[override]
        self,
        reference_dataset: pd.DataFrame,
        comparison_dataset: pd.DataFrame,
        config: EvidentlyProfileConfig,
    ) -> Output(  # type:ignore[valid-type]
        profile=dict, dashboard=str
    ):
        """Main entrypoint for the Evidently categorical target drift detection step.

        Args:
            reference_dataset: a Pandas DataFrame
            comparison_dataset: a Pandas DataFrame of new data you wish to
                compare against the reference data
            config: the configuration for the step

        Returns:
            profile: dictionary report extracted from an Evidently Profile
              generated for the data drift
            dashboard: HTML report extracted from an Evidently Dashboard
              generated for the data drift
        """
        sections, tabs = config.get_profile_sections_and_tabs()
        data_drift_dashboard = Dashboard(tabs=tabs)
        data_drift_dashboard.calculate(
            reference_dataset,
            comparison_dataset,
            column_mapping=config.column_mapping or None,
        )
        data_drift_profile = Profile(sections=sections)
        data_drift_profile.calculate(
            reference_dataset,
            comparison_dataset,
            column_mapping=config.column_mapping or None,
        )
        return [data_drift_profile.object(), data_drift_dashboard.html()]
CONFIG_CLASS (BaseDriftDetectionConfig) pydantic-model

Config class for Evidently profile steps.

column_mapping: properties of the DataFrame's columns used !!! profile_section "a string that identifies the profile section to be used." The following are valid options supported by Evidently: - "datadrift" - "categoricaltargetdrift" - "numericaltargetdrift" - "classificationmodelperformance" - "regressionmodelperformance" - "probabilisticmodelperformance"

Source code in zenml/integrations/evidently/steps/evidently_profile.py
class EvidentlyProfileConfig(BaseDriftDetectionConfig):
    """Config class for Evidently profile steps.

    column_mapping: properties of the DataFrame's columns used
    profile_section: a string that identifies the profile section to be used.
        The following are valid options supported by Evidently:
        - "datadrift"
        - "categoricaltargetdrift"
        - "numericaltargetdrift"
        - "classificationmodelperformance"
        - "regressionmodelperformance"
        - "probabilisticmodelperformance"
    """

    def get_profile_sections_and_tabs(
        self,
    ) -> Tuple[List[ProfileSection], List[Tab]]:
        """Get the profile sections and tabs to be used in the dashboard.

        Returns:
            A tuple of two lists of profile sections and tabs.

        Raises:
            ValueError: if the profile_section is not supported.
        """
        try:
            return (
                [
                    profile_mapper[profile]()
                    for profile in self.profile_sections
                ],
                [
                    dashboard_mapper[profile]()
                    for profile in self.profile_sections
                ],
            )
        except KeyError:
            nl = "\n"
            raise ValueError(
                f"Invalid profile section: {self.profile_sections} \n\n"
                f"Valid and supported options are: {nl}- "
                f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
            )

    column_mapping: Optional[ColumnMapping]
    profile_sections: Sequence[str]
get_profile_sections_and_tabs(self)

Get the profile sections and tabs to be used in the dashboard.

Returns:

Type Description
Tuple[List[evidently.model_profile.sections.base_profile_section.ProfileSection], List[evidently.dashboard.tabs.base_tab.Tab]]

A tuple of two lists of profile sections and tabs.

Exceptions:

Type Description
ValueError

if the profile_section is not supported.

Source code in zenml/integrations/evidently/steps/evidently_profile.py
def get_profile_sections_and_tabs(
    self,
) -> Tuple[List[ProfileSection], List[Tab]]:
    """Get the profile sections and tabs to be used in the dashboard.

    Returns:
        A tuple of two lists of profile sections and tabs.

    Raises:
        ValueError: if the profile_section is not supported.
    """
    try:
        return (
            [
                profile_mapper[profile]()
                for profile in self.profile_sections
            ],
            [
                dashboard_mapper[profile]()
                for profile in self.profile_sections
            ],
        )
    except KeyError:
        nl = "\n"
        raise ValueError(
            f"Invalid profile section: {self.profile_sections} \n\n"
            f"Valid and supported options are: {nl}- "
            f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
        )
entrypoint(self, reference_dataset, comparison_dataset, config)

Main entrypoint for the Evidently categorical target drift detection step.

Parameters:

Name Type Description Default
reference_dataset DataFrame

a Pandas DataFrame

required
comparison_dataset DataFrame

a Pandas DataFrame of new data you wish to compare against the reference data

required
config EvidentlyProfileConfig

the configuration for the step

required

Returns:

Type Description
profile

dictionary report extracted from an Evidently Profile generated for the data drift dashboard: HTML report extracted from an Evidently Dashboard generated for the data drift

Source code in zenml/integrations/evidently/steps/evidently_profile.py
def entrypoint(  # type: ignore[override]
    self,
    reference_dataset: pd.DataFrame,
    comparison_dataset: pd.DataFrame,
    config: EvidentlyProfileConfig,
) -> Output(  # type:ignore[valid-type]
    profile=dict, dashboard=str
):
    """Main entrypoint for the Evidently categorical target drift detection step.

    Args:
        reference_dataset: a Pandas DataFrame
        comparison_dataset: a Pandas DataFrame of new data you wish to
            compare against the reference data
        config: the configuration for the step

    Returns:
        profile: dictionary report extracted from an Evidently Profile
          generated for the data drift
        dashboard: HTML report extracted from an Evidently Dashboard
          generated for the data drift
    """
    sections, tabs = config.get_profile_sections_and_tabs()
    data_drift_dashboard = Dashboard(tabs=tabs)
    data_drift_dashboard.calculate(
        reference_dataset,
        comparison_dataset,
        column_mapping=config.column_mapping or None,
    )
    data_drift_profile = Profile(sections=sections)
    data_drift_profile.calculate(
        reference_dataset,
        comparison_dataset,
        column_mapping=config.column_mapping or None,
    )
    return [data_drift_profile.object(), data_drift_dashboard.html()]

visualizers special

Initialization for Evidently visualizer.

evidently_visualizer

Implementation of the Evidently visualizer.

EvidentlyVisualizer (BaseStepVisualizer)

The implementation of an Evidently Visualizer.

Source code in zenml/integrations/evidently/visualizers/evidently_visualizer.py
class EvidentlyVisualizer(BaseStepVisualizer):
    """The implementation of an Evidently Visualizer."""

    @abstractmethod
    def visualize(self, object: StepView, *args: Any, **kwargs: Any) -> None:
        """Method to visualize components.

        Args:
            object: StepView fetched from run.get_step().
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.
        """
        for artifact_view in object.outputs.values():
            # filter out anything but data analysis artifacts
            if (
                artifact_view.type == DataAnalysisArtifact.__name__
                and artifact_view.data_type == "builtins.str"
            ):
                artifact = artifact_view.read()
                self.generate_facet(artifact)

    def generate_facet(self, html_: str) -> None:
        """Generate a Facet Overview.

        Args:
            html_: HTML represented as a string.
        """
        if Environment.in_notebook() or Environment.in_google_colab():
            from IPython.core.display import HTML, display

            display(HTML(html_))
        else:
            logger.warning(
                "The magic functions are only usable in a Jupyter notebook."
            )
            with tempfile.NamedTemporaryFile(
                mode="w", delete=False, suffix=".html", encoding="utf-8"
            ) as f:
                f.write(html_)
                url = f"file:///{f.name}"
                logger.info("Opening %s in a new browser.." % f.name)
                webbrowser.open(url, new=2)
generate_facet(self, html_)

Generate a Facet Overview.

Parameters:

Name Type Description Default
html_ str

HTML represented as a string.

required
Source code in zenml/integrations/evidently/visualizers/evidently_visualizer.py
def generate_facet(self, html_: str) -> None:
    """Generate a Facet Overview.

    Args:
        html_: HTML represented as a string.
    """
    if Environment.in_notebook() or Environment.in_google_colab():
        from IPython.core.display import HTML, display

        display(HTML(html_))
    else:
        logger.warning(
            "The magic functions are only usable in a Jupyter notebook."
        )
        with tempfile.NamedTemporaryFile(
            mode="w", delete=False, suffix=".html", encoding="utf-8"
        ) as f:
            f.write(html_)
            url = f"file:///{f.name}"
            logger.info("Opening %s in a new browser.." % f.name)
            webbrowser.open(url, new=2)
visualize(self, object, *args, **kwargs)

Method to visualize components.

Parameters:

Name Type Description Default
object StepView

StepView fetched from run.get_step().

required
*args Any

Additional arguments.

()
**kwargs Any

Additional keyword arguments.

{}
Source code in zenml/integrations/evidently/visualizers/evidently_visualizer.py
@abstractmethod
def visualize(self, object: StepView, *args: Any, **kwargs: Any) -> None:
    """Method to visualize components.

    Args:
        object: StepView fetched from run.get_step().
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.
    """
    for artifact_view in object.outputs.values():
        # filter out anything but data analysis artifacts
        if (
            artifact_view.type == DataAnalysisArtifact.__name__
            and artifact_view.data_type == "builtins.str"
        ):
            artifact = artifact_view.read()
            self.generate_facet(artifact)

facets special

Facets integration for ZenML.

The Facets integration provides a simple way to visualize post-execution objects like PipelineView, PipelineRunView and StepView. These objects can be extended using the BaseVisualization class. This integration requires facets-overview be installed in your Python environment.

FacetsIntegration (Integration)

Definition of Facet integration for ZenML.

Source code in zenml/integrations/facets/__init__.py
class FacetsIntegration(Integration):
    """Definition of [Facet](https://pair-code.github.io/facets/) integration for ZenML."""

    NAME = FACETS
    REQUIREMENTS = ["facets-overview>=1.0.0", "IPython"]

visualizers special

Intitialization of the Facet Visualizer.

facet_statistics_visualizer

Implementation of the Facet Statistics Visualizer.

FacetStatisticsVisualizer (BaseStepVisualizer)

The base implementation of a ZenML Visualizer.

Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
class FacetStatisticsVisualizer(BaseStepVisualizer):
    """The base implementation of a ZenML Visualizer."""

    @abstractmethod
    def visualize(
        self, object: StepView, magic: bool = False, *args: Any, **kwargs: Any
    ) -> None:
        """Method to visualize components.

        Args:
            object: StepView fetched from run.get_step().
            magic: Whether to render in a Jupyter notebook or not.
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.
        """
        datasets = []
        for output_name, artifact_view in object.outputs.items():
            df = artifact_view.read()
            if type(df) is not pd.DataFrame:
                logger.warning(
                    "`%s` is not a pd.DataFrame. You can only visualize "
                    "statistics of steps that output pandas DataFrames. "
                    "Skipping this output.." % output_name
                )
            else:
                datasets.append({"name": output_name, "table": df})
        h = self.generate_html(datasets)
        self.generate_facet(h, magic)

    def generate_html(self, datasets: List[Dict[Text, pd.DataFrame]]) -> str:
        """Generates html for facet.

        Args:
            datasets: List of dicts of DataFrames to be visualized as stats.

        Returns:
            HTML template with proto string embedded.
        """
        proto = GenericFeatureStatisticsGenerator().ProtoFromDataFrames(
            datasets
        )
        protostr = base64.b64encode(proto.SerializeToString()).decode("utf-8")

        template = os.path.join(
            os.path.abspath(os.path.dirname(__file__)),
            "stats.html",
        )
        html_template = io_utils.read_file_contents_as_string(template)

        html_ = html_template.replace("protostr", protostr)
        return html_

    def generate_facet(self, html_: str, magic: bool = False) -> None:
        """Generate a Facet Overview.

        Args:
            html_: HTML represented as a string.
            magic: Whether to magically materialize facet in a notebook.

        Raises:
            EnvironmentError: If magic is True and not in a notebook.
        """
        if magic:
            if not Environment.in_notebook() or Environment.in_google_colab():
                raise EnvironmentError(
                    "The magic functions are only usable in a Jupyter notebook."
                )
            display(HTML(html_))
        else:
            with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
                io_utils.write_file_contents_as_string(f.name, html_)
                url = f"file:///{f.name}"
                logger.info("Opening %s in a new browser.." % f.name)
                webbrowser.open(url, new=2)
generate_facet(self, html_, magic=False)

Generate a Facet Overview.

Parameters:

Name Type Description Default
html_ str

HTML represented as a string.

required
magic bool

Whether to magically materialize facet in a notebook.

False

Exceptions:

Type Description
EnvironmentError

If magic is True and not in a notebook.

Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
def generate_facet(self, html_: str, magic: bool = False) -> None:
    """Generate a Facet Overview.

    Args:
        html_: HTML represented as a string.
        magic: Whether to magically materialize facet in a notebook.

    Raises:
        EnvironmentError: If magic is True and not in a notebook.
    """
    if magic:
        if not Environment.in_notebook() or Environment.in_google_colab():
            raise EnvironmentError(
                "The magic functions are only usable in a Jupyter notebook."
            )
        display(HTML(html_))
    else:
        with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
            io_utils.write_file_contents_as_string(f.name, html_)
            url = f"file:///{f.name}"
            logger.info("Opening %s in a new browser.." % f.name)
            webbrowser.open(url, new=2)
generate_html(self, datasets)

Generates html for facet.

Parameters:

Name Type Description Default
datasets List[Dict[str, pandas.core.frame.DataFrame]]

List of dicts of DataFrames to be visualized as stats.

required

Returns:

Type Description
str

HTML template with proto string embedded.

Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
def generate_html(self, datasets: List[Dict[Text, pd.DataFrame]]) -> str:
    """Generates html for facet.

    Args:
        datasets: List of dicts of DataFrames to be visualized as stats.

    Returns:
        HTML template with proto string embedded.
    """
    proto = GenericFeatureStatisticsGenerator().ProtoFromDataFrames(
        datasets
    )
    protostr = base64.b64encode(proto.SerializeToString()).decode("utf-8")

    template = os.path.join(
        os.path.abspath(os.path.dirname(__file__)),
        "stats.html",
    )
    html_template = io_utils.read_file_contents_as_string(template)

    html_ = html_template.replace("protostr", protostr)
    return html_
visualize(self, object, magic=False, *args, **kwargs)

Method to visualize components.

Parameters:

Name Type Description Default
object StepView

StepView fetched from run.get_step().

required
magic bool

Whether to render in a Jupyter notebook or not.

False
*args Any

Additional arguments.

()
**kwargs Any

Additional keyword arguments.

{}
Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
@abstractmethod
def visualize(
    self, object: StepView, magic: bool = False, *args: Any, **kwargs: Any
) -> None:
    """Method to visualize components.

    Args:
        object: StepView fetched from run.get_step().
        magic: Whether to render in a Jupyter notebook or not.
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.
    """
    datasets = []
    for output_name, artifact_view in object.outputs.items():
        df = artifact_view.read()
        if type(df) is not pd.DataFrame:
            logger.warning(
                "`%s` is not a pd.DataFrame. You can only visualize "
                "statistics of steps that output pandas DataFrames. "
                "Skipping this output.." % output_name
            )
        else:
            datasets.append({"name": output_name, "table": df})
    h = self.generate_html(datasets)
    self.generate_facet(h, magic)

feast special

Initialization for Feast integration.

The Feast integration offers a way to connect to a Feast Feature Store. ZenML implements a dedicated stack component that you can access as part of your ZenML steps in the usual ways.

FeastIntegration (Integration)

Definition of Feast integration for ZenML.

Source code in zenml/integrations/feast/__init__.py
class FeastIntegration(Integration):
    """Definition of Feast integration for ZenML."""

    NAME = FEAST
    REQUIREMENTS = ["feast[redis]>=0.19.4", "redis-server"]

    @classmethod
    def flavors(cls) -> List[FlavorWrapper]:
        """Declare the stack component flavors for the Feast integration.

        Returns:
            List of stack component flavors for this integration.
        """
        return [
            FlavorWrapper(
                name=FEAST_FEATURE_STORE_FLAVOR,
                source="zenml.integrations.feast.feature_store.FeastFeatureStore",
                type=StackComponentType.FEATURE_STORE,
                integration=cls.NAME,
            )
        ]
flavors() classmethod

Declare the stack component flavors for the Feast integration.

Returns:

Type Description
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper]

List of stack component flavors for this integration.

Source code in zenml/integrations/feast/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
    """Declare the stack component flavors for the Feast integration.

    Returns:
        List of stack component flavors for this integration.
    """
    return [
        FlavorWrapper(
            name=FEAST_FEATURE_STORE_FLAVOR,
            source="zenml.integrations.feast.feature_store.FeastFeatureStore",
            type=StackComponentType.FEATURE_STORE,
            integration=cls.NAME,
        )
    ]

feature_stores special

Feast Feature Store integration for ZenML.

Feature stores allow data teams to serve data via an offline store and an online low-latency store where data is kept in sync between the two. It also offers a centralized registry where features (and feature schemas) are stored for use within a team or wider organization. Feature stores are a relatively recent addition to commonly-used machine learning stacks. Feast is a leading open-source feature store, first developed by Gojek in collaboration with Google.

feast_feature_store

Implementation of the Feast Feature Store for ZenML.

FeastFeatureStore (BaseFeatureStore) pydantic-model

Class to interact with the Feast feature store.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
class FeastFeatureStore(BaseFeatureStore):
    """Class to interact with the Feast feature store."""

    FLAVOR: ClassVar[str] = FEAST_FEATURE_STORE_FLAVOR

    online_host: str = "localhost"
    online_port: int = 6379
    feast_repo: str

    def _validate_connection(self) -> None:
        """Validates the connection to the feature store.

        Raises:
            ConnectionError: If the online component (Redis) is not available.
        """
        client = redis.Redis(host=self.online_host, port=self.online_port)
        try:
            client.ping()
        except redis.exceptions.ConnectionError as e:
            raise redis.exceptions.ConnectionError(
                "Could not connect to feature store's online component. "
                "Please make sure that Redis is running."
            ) from e

    def get_historical_features(
        self,
        entity_df: Union[pd.DataFrame, str],
        features: List[str],
        full_feature_names: bool = False,
    ) -> pd.DataFrame:
        """Returns the historical features for training or batch scoring.

        Args:
            entity_df: The entity DataFrame or entity name.
            features: The features to retrieve.
            full_feature_names: Whether to return the full feature names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The historical features as a Pandas DataFrame.
        """
        fs = FeatureStore(repo_path=self.feast_repo)

        return fs.get_historical_features(
            entity_df=entity_df,
            features=features,
            full_feature_names=full_feature_names,
        ).to_df()

    def get_online_features(
        self,
        entity_rows: List[Dict[str, Any]],
        features: List[str],
        full_feature_names: bool = False,
    ) -> Dict[str, Any]:
        """Returns the latest online feature data.

        Args:
            entity_rows: The entity rows to retrieve.
            features: The features to retrieve.
            full_feature_names: Whether to return the full feature names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The latest online feature data as a dictionary.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.feast_repo)

        return fs.get_online_features(  # type: ignore[no-any-return]
            entity_rows=entity_rows,
            features=features,
            full_feature_names=full_feature_names,
        ).to_dict()

    def get_data_sources(self) -> List[str]:
        """Returns the data sources' names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The data sources' names.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.feast_repo)
        return [ds.name for ds in fs.list_data_sources()]

    def get_entities(self) -> List[str]:
        """Returns the entity names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The entity names.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.feast_repo)
        return [ds.name for ds in fs.list_entities()]

    def get_feature_services(self) -> List[str]:
        """Returns the feature service names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The feature service names.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.feast_repo)
        return [ds.name for ds in fs.list_feature_services()]

    def get_feature_views(self) -> List[str]:
        """Returns the feature view names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The feature view names.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.feast_repo)
        return [ds.name for ds in fs.list_feature_views()]

    def get_project(self) -> str:
        """Returns the project name.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The project name.
        """
        fs = FeatureStore(repo_path=self.feast_repo)
        return str(fs.project)

    def get_registry(self) -> Registry:
        """Returns the feature store registry.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The registry.
        """
        fs: FeatureStore = FeatureStore(repo_path=self.feast_repo)
        return fs.registry

    def get_feast_version(self) -> str:
        """Returns the version of Feast used.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The version of Feast currently being used.
        """
        fs = FeatureStore(repo_path=self.feast_repo)
        return str(fs.version())
get_data_sources(self)

Returns the data sources' names.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
List[str]

The data sources' names.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_data_sources(self) -> List[str]:
    """Returns the data sources' names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The data sources' names.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.feast_repo)
    return [ds.name for ds in fs.list_data_sources()]
get_entities(self)

Returns the entity names.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
List[str]

The entity names.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_entities(self) -> List[str]:
    """Returns the entity names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The entity names.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.feast_repo)
    return [ds.name for ds in fs.list_entities()]
get_feast_version(self)

Returns the version of Feast used.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
str

The version of Feast currently being used.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feast_version(self) -> str:
    """Returns the version of Feast used.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The version of Feast currently being used.
    """
    fs = FeatureStore(repo_path=self.feast_repo)
    return str(fs.version())
get_feature_services(self)

Returns the feature service names.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
List[str]

The feature service names.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_services(self) -> List[str]:
    """Returns the feature service names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The feature service names.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.feast_repo)
    return [ds.name for ds in fs.list_feature_services()]
get_feature_views(self)

Returns the feature view names.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
List[str]

The feature view names.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_views(self) -> List[str]:
    """Returns the feature view names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The feature view names.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.feast_repo)
    return [ds.name for ds in fs.list_feature_views()]
get_historical_features(self, entity_df, features, full_feature_names=False)

Returns the historical features for training or batch scoring.

Parameters:

Name Type Description Default
entity_df Union[pandas.core.frame.DataFrame, str]

The entity DataFrame or entity name.

required
features List[str]

The features to retrieve.

required
full_feature_names bool

Whether to return the full feature names.

False

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
DataFrame

The historical features as a Pandas DataFrame.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_historical_features(
    self,
    entity_df: Union[pd.DataFrame, str],
    features: List[str],
    full_feature_names: bool = False,
) -> pd.DataFrame:
    """Returns the historical features for training or batch scoring.

    Args:
        entity_df: The entity DataFrame or entity name.
        features: The features to retrieve.
        full_feature_names: Whether to return the full feature names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The historical features as a Pandas DataFrame.
    """
    fs = FeatureStore(repo_path=self.feast_repo)

    return fs.get_historical_features(
        entity_df=entity_df,
        features=features,
        full_feature_names=full_feature_names,
    ).to_df()
get_online_features(self, entity_rows, features, full_feature_names=False)

Returns the latest online feature data.

Parameters:

Name Type Description Default
entity_rows List[Dict[str, Any]]

The entity rows to retrieve.

required
features List[str]

The features to retrieve.

required
full_feature_names bool

Whether to return the full feature names.

False

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
Dict[str, Any]

The latest online feature data as a dictionary.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_online_features(
    self,
    entity_rows: List[Dict[str, Any]],
    features: List[str],
    full_feature_names: bool = False,
) -> Dict[str, Any]:
    """Returns the latest online feature data.

    Args:
        entity_rows: The entity rows to retrieve.
        features: The features to retrieve.
        full_feature_names: Whether to return the full feature names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The latest online feature data as a dictionary.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.feast_repo)

    return fs.get_online_features(  # type: ignore[no-any-return]
        entity_rows=entity_rows,
        features=features,
        full_feature_names=full_feature_names,
    ).to_dict()
get_project(self)

Returns the project name.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
str

The project name.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_project(self) -> str:
    """Returns the project name.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The project name.
    """
    fs = FeatureStore(repo_path=self.feast_repo)
    return str(fs.project)
get_registry(self)

Returns the feature store registry.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
Registry

The registry.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_registry(self) -> Registry:
    """Returns the feature store registry.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The registry.
    """
    fs: FeatureStore = FeatureStore(repo_path=self.feast_repo)
    return fs.registry

gcp special

Initialization of the GCP ZenML integration.

The GCP integration submodule provides a way to run ZenML pipelines in a cloud environment. Specifically, it allows the use of cloud artifact stores, metadata stores, and an io module to handle file operations on Google Cloud Storage (GCS).

Additionally, the GCP secrets manager integration submodule provides a way to access the GCP secrets manager from within your ZenML Pipeline runs.

The Vertex AI integration submodule provides a way to run ZenML pipelines in a Vertex AI environment.

GcpIntegration (Integration)

Definition of Google Cloud Platform integration for ZenML.

Source code in zenml/integrations/gcp/__init__.py
class GcpIntegration(Integration):
    """Definition of Google Cloud Platform integration for ZenML."""

    NAME = GCP
    REQUIREMENTS = [
        "kfp",
        "gcsfs",
        "google-cloud-secret-manager",
        "google-cloud-aiplatform>=1.11.0",
    ]

    @classmethod
    def flavors(cls) -> List[FlavorWrapper]:
        """Declare the stack component flavors for the GCP integration.

        Returns:
            List of stack component flavors for this integration.
        """
        return [
            FlavorWrapper(
                name=GCP_ARTIFACT_STORE_FLAVOR,
                source="zenml.integrations.gcp.artifact_stores"
                ".GCPArtifactStore",
                type=StackComponentType.ARTIFACT_STORE,
                integration=cls.NAME,
            ),
            FlavorWrapper(
                name=GCP_SECRETS_MANAGER_FLAVOR,
                source="zenml.integrations.gcp.secrets_manager."
                "GCPSecretsManager",
                type=StackComponentType.SECRETS_MANAGER,
                integration=cls.NAME,
            ),
            FlavorWrapper(
                name=GCP_VERTEX_ORCHESTRATOR_FLAVOR,
                source="zenml.integrations.gcp.orchestrators"
                ".VertexOrchestrator",
                type=StackComponentType.ORCHESTRATOR,
                integration=cls.NAME,
            ),
            FlavorWrapper(
                name=GCP_VERTEX_STEP_OPERATOR_FLAVOR,
                source="zenml.integrations.gcp.step_operators"
                ".VertexStepOperator",
                type=StackComponentType.STEP_OPERATOR,
                integration=cls.NAME,
            ),
        ]
flavors() classmethod

Declare the stack component flavors for the GCP integration.

Returns:

Type Description
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper]

List of stack component flavors for this integration.

Source code in zenml/integrations/gcp/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
    """Declare the stack component flavors for the GCP integration.

    Returns:
        List of stack component flavors for this integration.
    """
    return [
        FlavorWrapper(
            name=GCP_ARTIFACT_STORE_FLAVOR,
            source="zenml.integrations.gcp.artifact_stores"
            ".GCPArtifactStore",
            type=StackComponentType.ARTIFACT_STORE,
            integration=cls.NAME,
        ),
        FlavorWrapper(
            name=GCP_SECRETS_MANAGER_FLAVOR,
            source="zenml.integrations.gcp.secrets_manager."
            "GCPSecretsManager",
            type=StackComponentType.SECRETS_MANAGER,
            integration=cls.NAME,
        ),
        FlavorWrapper(
            name=GCP_VERTEX_ORCHESTRATOR_FLAVOR,
            source="zenml.integrations.gcp.orchestrators"
            ".VertexOrchestrator",
            type=StackComponentType.ORCHESTRATOR,
            integration=cls.NAME,
        ),
        FlavorWrapper(
            name=GCP_VERTEX_STEP_OPERATOR_FLAVOR,
            source="zenml.integrations.gcp.step_operators"
            ".VertexStepOperator",
            type=StackComponentType.STEP_OPERATOR,
            integration=cls.NAME,
        ),
    ]

artifact_stores special

Initialization of the GCP Artifact Store.

gcp_artifact_store

Implementation of the GCP Artifact Store.

GCPArtifactStore (BaseArtifactStore, AuthenticationMixin) pydantic-model

Artifact Store for Google Cloud Storage based artifacts.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
class GCPArtifactStore(BaseArtifactStore, AuthenticationMixin):
    """Artifact Store for Google Cloud Storage based artifacts."""

    _filesystem: Optional[gcsfs.GCSFileSystem] = None

    # Class Configuration
    FLAVOR: ClassVar[str] = GCP_ARTIFACT_STORE_FLAVOR
    SUPPORTED_SCHEMES: ClassVar[Set[str]] = {GCP_PATH_PREFIX}

    @property
    def filesystem(self) -> gcsfs.GCSFileSystem:
        """The gcsfs filesystem to access this artifact store.

        Returns:
            The gcsfs filesystem to access this artifact store.
        """
        if not self._filesystem:
            secret = self.get_authentication_secret(
                expected_schema_type=GCPSecretSchema
            )
            token = secret.get_credential_dict() if secret else None
            self._filesystem = gcsfs.GCSFileSystem(token=token)

        return self._filesystem

    def open(self, path: PathType, mode: str = "r") -> Any:
        """Open a file at the given path.

        Args:
            path: Path of the file to open.
            mode: Mode in which to open the file. Currently, only
                'rb' and 'wb' to read and write binary files are supported.

        Returns:
            A file-like object that can be used to read or write to the file.
        """
        return self.filesystem.open(path=path, mode=mode)

    def copyfile(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Copy a file.

        Args:
            src: The path to copy from.
            dst: The path to copy to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to copy to destination '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to copy anyway."
            )
        # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
        #  manually remove it first
        self.filesystem.copy(path1=src, path2=dst)

    def exists(self, path: PathType) -> bool:
        """Check whether a path exists.

        Args:
            path: The path to check.

        Returns:
            True if the path exists, False otherwise.
        """
        return self.filesystem.exists(path=path)  # type: ignore[no-any-return]

    def glob(self, pattern: PathType) -> List[PathType]:
        """Return all paths that match the given glob pattern.

        The glob pattern may include:
        - '*' to match any number of characters
        - '?' to match a single character
        - '[...]' to match one of the characters inside the brackets
        - '**' as the full name of a path component to match to search
          in subdirectories of any depth (e.g. '/some_dir/**/some_file)

        Args:
            pattern: The glob pattern to match, see details above.

        Returns:
            A list of paths that match the given glob pattern.
        """
        return [
            f"{GCP_PATH_PREFIX}{path}"
            for path in self.filesystem.glob(path=pattern)
        ]

    def isdir(self, path: PathType) -> bool:
        """Check whether a path is a directory.

        Args:
            path: The path to check.

        Returns:
            True if the path is a directory, False otherwise.
        """
        return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]

    def listdir(self, path: PathType) -> List[PathType]:
        """Return a list of files in a directory.

        Args:
            path: The path of the directory to list.

        Returns:
            A list of paths of files in the directory.
        """
        path_without_prefix = convert_to_str(path)
        if path_without_prefix.startswith(GCP_PATH_PREFIX):
            path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :]

        def _extract_basename(file_dict: Dict[str, Any]) -> str:
            """Extracts the basename from a file info dict returned by GCP.

            Args:
                file_dict: A file info dict returned by the GCP filesystem.

            Returns:
                The basename of the file.
            """
            file_path = cast(str, file_dict["name"])
            base_name = file_path[len(path_without_prefix) :]
            return base_name.lstrip("/")

        return [
            _extract_basename(dict_)
            for dict_ in self.filesystem.listdir(path=path)
        ]

    def makedirs(self, path: PathType) -> None:
        """Create a directory at the given path.

        If needed also create missing parent directories.

        Args:
            path: The path of the directory to create.
        """
        self.filesystem.makedirs(path=path, exist_ok=True)

    def mkdir(self, path: PathType) -> None:
        """Create a directory at the given path.

        Args:
            path: The path of the directory to create.
        """
        self.filesystem.makedir(path=path)

    def remove(self, path: PathType) -> None:
        """Remove the file at the given path.

        Args:
            path: The path of the file to remove.
        """
        self.filesystem.rm_file(path=path)

    def rename(
        self, src: PathType, dst: PathType, overwrite: bool = False
    ) -> None:
        """Rename source file to destination file.

        Args:
            src: The path of the file to rename.
            dst: The path to rename the source file to.
            overwrite: If a file already exists at the destination, this
                method will overwrite it if overwrite=`True` and
                raise a FileExistsError otherwise.

        Raises:
            FileExistsError: If a file already exists at the destination
                and overwrite is not set to `True`.
        """
        if not overwrite and self.filesystem.exists(dst):
            raise FileExistsError(
                f"Unable to rename file to '{convert_to_str(dst)}', "
                f"file already exists. Set `overwrite=True` to rename anyway."
            )

        # TODO [ENG-152]: Check if it works with overwrite=True or if we need
        #  to manually remove it first
        self.filesystem.rename(path1=src, path2=dst)

    def rmtree(self, path: PathType) -> None:
        """Remove the given directory.

        Args:
            path: The path of the directory to remove.
        """
        self.filesystem.delete(path=path, recursive=True)

    def stat(self, path: PathType) -> Dict[str, Any]:
        """Return stat info for the given path.

        Args:
            path: the path to get stat info for.

        Returns:
            A dictionary with the stat info.
        """
        return self.filesystem.stat(path=path)  # type: ignore[no-any-return]

    def walk(
        self,
        top: PathType,
        topdown: bool = True,
        onerror: Optional[Callable[..., None]] = None,
    ) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
        """Return an iterator that walks the contents of the given directory.

        Args:
            top: Path of directory to walk.
            topdown: Unused argument to conform to interface.
            onerror: Unused argument to conform to interface.

        Yields:
            An Iterable of Tuples, each of which contain the path of the current
            directory path, a list of directories inside the current directory
            and a list of files inside the current directory.
        """
        # TODO [ENG-153]: Additional params
        for (
            directory,
            subdirectories,
            files,
        ) in self.filesystem.walk(path=top):
            yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files
filesystem: GCSFileSystem property readonly

The gcsfs filesystem to access this artifact store.

Returns:

Type Description
GCSFileSystem

The gcsfs filesystem to access this artifact store.

copyfile(self, src, dst, overwrite=False)

Copy a file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path to copy from.

required
dst Union[bytes, str]

The path to copy to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def copyfile(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Copy a file.

    Args:
        src: The path to copy from.
        dst: The path to copy to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to copy to destination '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to copy anyway."
        )
    # TODO [ENG-151]: Check if it works with overwrite=True or if we need to
    #  manually remove it first
    self.filesystem.copy(path1=src, path2=dst)
exists(self, path)

Check whether a path exists.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path exists, False otherwise.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def exists(self, path: PathType) -> bool:
    """Check whether a path exists.

    Args:
        path: The path to check.

    Returns:
        True if the path exists, False otherwise.
    """
    return self.filesystem.exists(path=path)  # type: ignore[no-any-return]
glob(self, pattern)

Return all paths that match the given glob pattern.

The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)

Parameters:

Name Type Description Default
pattern Union[bytes, str]

The glob pattern to match, see details above.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths that match the given glob pattern.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
    """Return all paths that match the given glob pattern.

    The glob pattern may include:
    - '*' to match any number of characters
    - '?' to match a single character
    - '[...]' to match one of the characters inside the brackets
    - '**' as the full name of a path component to match to search
      in subdirectories of any depth (e.g. '/some_dir/**/some_file)

    Args:
        pattern: The glob pattern to match, see details above.

    Returns:
        A list of paths that match the given glob pattern.
    """
    return [
        f"{GCP_PATH_PREFIX}{path}"
        for path in self.filesystem.glob(path=pattern)
    ]
isdir(self, path)

Check whether a path is a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path to check.

required

Returns:

Type Description
bool

True if the path is a directory, False otherwise.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def isdir(self, path: PathType) -> bool:
    """Check whether a path is a directory.

    Args:
        path: The path to check.

    Returns:
        True if the path is a directory, False otherwise.
    """
    return self.filesystem.isdir(path=path)  # type: ignore[no-any-return]
listdir(self, path)

Return a list of files in a directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to list.

required

Returns:

Type Description
List[Union[bytes, str]]

A list of paths of files in the directory.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
    """Return a list of files in a directory.

    Args:
        path: The path of the directory to list.

    Returns:
        A list of paths of files in the directory.
    """
    path_without_prefix = convert_to_str(path)
    if path_without_prefix.startswith(GCP_PATH_PREFIX):
        path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :]

    def _extract_basename(file_dict: Dict[str, Any]) -> str:
        """Extracts the basename from a file info dict returned by GCP.

        Args:
            file_dict: A file info dict returned by the GCP filesystem.

        Returns:
            The basename of the file.
        """
        file_path = cast(str, file_dict["name"])
        base_name = file_path[len(path_without_prefix) :]
        return base_name.lstrip("/")

    return [
        _extract_basename(dict_)
        for dict_ in self.filesystem.listdir(path=path)
    ]
makedirs(self, path)

Create a directory at the given path.

If needed also create missing parent directories.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to create.

required
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def makedirs(self, path: PathType) -> None:
    """Create a directory at the given path.

    If needed also create missing parent directories.

    Args:
        path: The path of the directory to create.
    """
    self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)

Create a directory at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to create.

required
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def mkdir(self, path: PathType) -> None:
    """Create a directory at the given path.

    Args:
        path: The path of the directory to create.
    """
    self.filesystem.makedir(path=path)
open(self, path, mode='r')

Open a file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path of the file to open.

required
mode str

Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported.

'r'

Returns:

Type Description
Any

A file-like object that can be used to read or write to the file.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
    """Open a file at the given path.

    Args:
        path: Path of the file to open.
        mode: Mode in which to open the file. Currently, only
            'rb' and 'wb' to read and write binary files are supported.

    Returns:
        A file-like object that can be used to read or write to the file.
    """
    return self.filesystem.open(path=path, mode=mode)
remove(self, path)

Remove the file at the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the file to remove.

required
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def remove(self, path: PathType) -> None:
    """Remove the file at the given path.

    Args:
        path: The path of the file to remove.
    """
    self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)

Rename source file to destination file.

Parameters:

Name Type Description Default
src Union[bytes, str]

The path of the file to rename.

required
dst Union[bytes, str]

The path to rename the source file to.

required
overwrite bool

If a file already exists at the destination, this method will overwrite it if overwrite=True and raise a FileExistsError otherwise.

False

Exceptions:

Type Description
FileExistsError

If a file already exists at the destination and overwrite is not set to True.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def rename(
    self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
    """Rename source file to destination file.

    Args:
        src: The path of the file to rename.
        dst: The path to rename the source file to.
        overwrite: If a file already exists at the destination, this
            method will overwrite it if overwrite=`True` and
            raise a FileExistsError otherwise.

    Raises:
        FileExistsError: If a file already exists at the destination
            and overwrite is not set to `True`.
    """
    if not overwrite and self.filesystem.exists(dst):
        raise FileExistsError(
            f"Unable to rename file to '{convert_to_str(dst)}', "
            f"file already exists. Set `overwrite=True` to rename anyway."
        )

    # TODO [ENG-152]: Check if it works with overwrite=True or if we need
    #  to manually remove it first
    self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)

Remove the given directory.

Parameters:

Name Type Description Default
path Union[bytes, str]

The path of the directory to remove.

required
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def rmtree(self, path: PathType) -> None:
    """Remove the given directory.

    Args:
        path: The path of the directory to remove.
    """
    self.filesystem.delete(path=path, recursive=True)
stat(self, path)

Return stat info for the given path.

Parameters:

Name Type Description Default
path Union[bytes, str]

the path to get stat info for.

required

Returns:

Type Description
Dict[str, Any]

A dictionary with the stat info.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
    """Return stat info for the given path.

    Args:
        path: the path to get stat info for.

    Returns:
        A dictionary with the stat info.
    """
    return self.filesystem.stat(path=path)  # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)

Return an iterator that walks the contents of the given directory.

Parameters:

Name Type Description Default
top Union[bytes, str]

Path of directory to walk.

required
topdown bool

Unused argument to conform to interface.

True
onerror Optional[Callable[..., NoneType]]

Unused argument to conform to interface.

None

Yields:

Type Description
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]]

An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory.

Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def walk(
    self,
    top: PathType,
    topdown: bool = True,
    onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
    """Return an iterator that walks the contents of the given directory.

    Args:
        top: Path of directory to walk.
        topdown: Unused argument to conform to interface.
        onerror: Unused argument to conform to interface.

    Yields:
        An Iterable of Tuples, each of which contain the path of the current
        directory path, a list of directories inside the current directory
        and a list of files inside the current directory.
    """
    # TODO [ENG-153]: Additional params
    for (
        directory,
        subdirectories,
        files,
    ) in self.filesystem.walk(path=top):
        yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files

constants

Constants for the VertexAI integration.

google_credentials_mixin

Implementation of the Google credentials mixin.

GoogleCredentialsMixin (BaseModel) pydantic-model

Mixin for Google Cloud Platform credentials.

Attributes:

Name Type Description
service_account_path Optional[str]

path to the service account credentials file to be used for authentication. If not provided, the default credentials will be used.

Source code in zenml/integrations/gcp/google_credentials_mixin.py
class GoogleCredentialsMixin(BaseModel):
    """Mixin for Google Cloud Platform credentials.

    Attributes:
        service_account_path: path to the service account credentials file to be
            used for authentication. If not provided, the default credentials
            will be used.
    """

    service_account_path: Optional[str] = None

    def _get_authentication(self) -> Tuple["Credentials", str]:
        """Get GCP credentials and the project ID associated with the credentials.

        If `service_account_path` is provided, then the credentials will be
        loaded from the file at that path. Otherwise, the default credentials
        will be used.

        Returns:
            A tuple containing the credentials and the project ID associated to
            the credentials.
        """
        if self.service_account_path:
            credentials, project_id = load_credentials_from_file(
                self.service_account_path
            )
        else:
            credentials, project_id = default()
        return credentials, project_id

orchestrators special

Initialization for the VertexAI orchestrator.

vertex_entrypoint_configuration

Implementation of the VertexAI entrypoint configuration.

VertexEntrypointConfiguration (StepEntrypointConfiguration)

Entrypoint configuration for running steps on Vertex AI Pipelines.

Source code in zenml/integrations/gcp/orchestrators/vertex_entrypoint_configuration.py
class VertexEntrypointConfiguration(StepEntrypointConfiguration):
    """Entrypoint configuration for running steps on Vertex AI Pipelines."""

    @classmethod
    def get_custom_entrypoint_options(cls) -> Set[str]:
        """Vertex AI Pipelines specific entrypoint options.

        The argument `VERTEX_JOB_ID_OPTION` allows to specify the job id of the
        Vertex AI Pipeline and get it in the execution of the step, via the `get_run_name`
        method.

        Returns:
            The set of custom entrypoint options.
        """
        return {VERTEX_JOB_ID_OPTION}

    @classmethod
    def get_custom_entrypoint_arguments(
        cls, step: "BaseStep", *args: Any, **kwargs: Any
    ) -> List[str]:
        """Sets the value for the `VERTEX_JOB_ID_OPTION` argument.

        Args:
            step: The step to be executed.
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.

        Returns:
            A list of arguments for the entrypoint.
        """
        return [f"--{VERTEX_JOB_ID_OPTION}", kwargs[VERTEX_JOB_ID_OPTION]]

    def get_run_name(self, pipeline_name: str) -> str:
        """Returns the Vertex AI Pipeline job id.

        Args:
            pipeline_name: The name of the pipeline.

        Returns:
            The Vertex AI Pipeline job id.
        """
        job_id: str = self.entrypoint_args[VERTEX_JOB_ID_OPTION]
        return job_id
get_custom_entrypoint_arguments(step, *args, **kwargs) classmethod

Sets the value for the VERTEX_JOB_ID_OPTION argument.

Parameters:

Name Type Description Default
step BaseStep

The step to be executed.

required
*args Any

Additional arguments.

()
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
List[str]

A list of arguments for the entrypoint.

Source code in zenml/integrations/gcp/orchestrators/vertex_entrypoint_configuration.py
@classmethod
def get_custom_entrypoint_arguments(
    cls, step: "BaseStep", *args: Any, **kwargs: Any
) -> List[str]:
    """Sets the value for the `VERTEX_JOB_ID_OPTION` argument.

    Args:
        step: The step to be executed.
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        A list of arguments for the entrypoint.
    """
    return [f"--{VERTEX_JOB_ID_OPTION}", kwargs[VERTEX_JOB_ID_OPTION]]
get_custom_entrypoint_options() classmethod

Vertex AI Pipelines specific entrypoint options.

The argument VERTEX_JOB_ID_OPTION allows to specify the job id of the Vertex AI Pipeline and get it in the execution of the step, via the get_run_name method.

Returns:

Type Description
Set[str]

The set of custom entrypoint options.

Source code in zenml/integrations/gcp/orchestrators/vertex_entrypoint_configuration.py
@classmethod
def get_custom_entrypoint_options(cls) -> Set[str]:
    """Vertex AI Pipelines specific entrypoint options.

    The argument `VERTEX_JOB_ID_OPTION` allows to specify the job id of the
    Vertex AI Pipeline and get it in the execution of the step, via the `get_run_name`
    method.

    Returns:
        The set of custom entrypoint options.
    """
    return {VERTEX_JOB_ID_OPTION}
get_run_name(self, pipeline_name)

Returns the Vertex AI Pipeline job id.

Parameters:

Name Type Description Default
pipeline_name str

The name of the pipeline.

required

Returns:

Type Description
str

The Vertex AI Pipeline job id.

Source code in zenml/integrations/gcp/orchestrators/vertex_entrypoint_configuration.py
def get_run_name(self, pipeline_name: str) -> str:
    """Returns the Vertex AI Pipeline job id.

    Args:
        pipeline_name: The name of the pipeline.

    Returns:
        The Vertex AI Pipeline job id.
    """
    job_id: str = self.entrypoint_args[VERTEX_JOB_ID_OPTION]
    return job_id
vertex_orchestrator

Implementation of the VertexAI orchestrator.

VertexOrchestrator (BaseOrchestrator, GoogleCredentialsMixin) pydantic-model

Orchestrator responsible for running pipelines on Vertex AI.

Attributes:

Name Type Description
custom_docker_base_image_name Optional[str]

Name of the Docker image that should be used as the base for the image that will be used to execute each of the steps. If no custom base image is given, a basic image of the active ZenML version will be used. Note: This image needs to have ZenML installed, otherwise the pipeline execution will fail. For that reason, you might want to extend the ZenML Docker images found here: https://hub.docker.com/r/zenmldocker/zenml/

project Optional[str]

GCP project name. If None, the project will be inferred from the environment.

location str

Name of GCP region where the pipeline job will be executed. Vertex AI Pipelines is available in the following regions: https://cloud.google.com/vertex-ai/docs/general/locations#feature -availability

pipeline_root Optional[str]

a Cloud Storage URI that will be used by the Vertex AI

encryption_spec_key_name Optional[str]

The Cloud KMS resource identifier of the

customer managed encryption key used to protect the job. Has the form

projects/<PRJCT>/locations/<REGION>/keyRings/<KR>/cryptoKeys/<KEY> . The key needs to be in the same region as where the compute resource is created.

workload_service_account Optional[str]

the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. If not provided, the default service account will be used.

network Optional[str]

the full name of the Compute Engine Network to which the job

synchronous bool

If True, running a pipeline using this orchestrator will block until all steps finished running on Vertex AI Pipelines service.

Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
class VertexOrchestrator(BaseOrchestrator, GoogleCredentialsMixin):
    """Orchestrator responsible for running pipelines on Vertex AI.

    Attributes:
        custom_docker_base_image_name: Name of the Docker image that should be
            used as the base for the image that will be used to execute each of
            the steps. If no custom base image is given, a basic image of the
            active ZenML version will be used. **Note**: This image needs to
            have ZenML installed, otherwise the pipeline execution will fail.
            For that reason, you might want to extend the ZenML Docker images found
            here: https://hub.docker.com/r/zenmldocker/zenml/
        project: GCP project name. If `None`, the project will be inferred from
            the environment.
        location: Name of GCP region where the pipeline job will be executed.
            Vertex AI Pipelines is available in the following regions:
            https://cloud.google.com/vertex-ai/docs/general/locations#feature
            -availability
        pipeline_root: a Cloud Storage URI that will be used by the Vertex AI
        Pipelines.
            If not provided but the artifact store in the stack used to execute
            the pipeline is a
            `zenml.integrations.gcp.artifact_stores.GCPArtifactStore`,
            then a subdirectory of the artifact store will be used.
        encryption_spec_key_name: The Cloud KMS resource identifier of the
        customer
            managed encryption key used to protect the job. Has the form:
            `projects/<PRJCT>/locations/<REGION>/keyRings/<KR>/cryptoKeys/<KEY>`
            . The key needs to be in the same region as where the compute
            resource is created.
        workload_service_account: the service account for workload run-as
            account. Users submitting jobs must have act-as permission on this
            run-as account.
            If not provided, the default service account will be used.
        network: the full name of the Compute Engine Network to which the job
        should
            be peered. For example, `projects/12345/global/networks/myVPC`
            If not provided, the job will not be peered with any network.
        synchronous: If `True`, running a pipeline using this orchestrator will
            block until all steps finished running on Vertex AI Pipelines
            service.
    """

    custom_docker_base_image_name: Optional[str] = None
    project: Optional[str] = None
    location: str
    pipeline_root: Optional[str] = None
    labels: Dict[str, str] = {}
    encryption_spec_key_name: Optional[str] = None
    workload_service_account: Optional[str] = None
    network: Optional[str] = None
    synchronous: bool = False

    _pipeline_root: str

    FLAVOR: ClassVar[str] = GCP_VERTEX_ORCHESTRATOR_FLAVOR

    @property
    def validator(self) -> Optional[StackValidator]:
        """Validates that the stack contains a container registry.

        Also validates that the artifact store and metadata store used are not
        local.

        Returns:
            A StackValidator instance.
        """

        def _validate_stack_requirements(stack: "Stack") -> Tuple[bool, str]:
            """Validates that all the stack components are not local.

            Args:
                stack: The stack to validate.

            Returns:
                A tuple of (is_valid, error_message).
            """
            # Validate that the container registry is not local.
            container_registry = stack.container_registry
            if container_registry and container_registry.is_local:
                return False, (
                    f"The Vertex orchestrator does not support local "
                    f"container registries. You should replace the component '"
                    f"{container_registry.name}' "
                    f"{container_registry.TYPE.value} to a remote one."
                )

            # Validate that the rest of the components are not local.
            for stack_comp in stack.components.values():
                local_path = stack_comp.local_path
                if not local_path:
                    continue
                return False, (
                    f"The '{stack_comp.name}' {stack_comp.TYPE.value} is a "
                    f"local stack component. The Vertex AI Pipelines "
                    f"orchestrator requires that all the components in the "
                    f"stack used to execute the pipeline have to be not local, "
                    f"because there is no way for Vertex to connect to your "
                    f"local machine. You should use a flavor of "
                    f"{stack_comp.TYPE.value} other than '"
                    f"{stack_comp.FLAVOR}'."
                )

            # If the `pipeline_root` has not been defined in the orchestrator
            # configuration, and the artifact store is not a GCP artifact store,
            # then raise an error.
            if (
                not self.pipeline_root
                and stack.artifact_store.FLAVOR != GCP_ARTIFACT_STORE_FLAVOR
            ):
                return False, (
                    f"The attribute `pipeline_root` has not been set and it "
                    f"cannot be generated using the path of the artifact store "
                    f"because it is not a "
                    f"`zenml.integrations.gcp.artifact_store.GCPArtifactStore`."
                    f" To solve this issue, set the `pipeline_root` attribute "
                    f"manually executing the following command: "
                    f"`zenml orchestrator update {stack.orchestrator.name} "
                    f'--pipeline_root="<Cloud Storage URI>"`.'
                )

            return True, ""

        return StackValidator(
            required_components={StackComponentType.CONTAINER_REGISTRY},
            custom_validation_function=_validate_stack_requirements,
        )

    def get_docker_image_name(self, pipeline_name: str) -> str:
        """Returns the full docker image name including registry and tag.

        Args:
            pipeline_name: The name of the pipeline.

        Returns:
            The full docker image name including registry and tag.
        """
        base_image_name = f"zenml-vertex:{pipeline_name}"
        container_registry = Repository().active_stack.container_registry

        if container_registry:
            registry_uri = container_registry.uri.rstrip("/")
            return f"{registry_uri}/{base_image_name}"

        return base_image_name

    @property
    def root_directory(self) -> str:
        """Returns path to the root directory for files for this orchestrator.

        Returns:
            The path to the root directory for all files concerning this
            orchestrator.
        """
        return os.path.join(
            get_global_config_directory(), "vertex", str(self.uuid)
        )

    @property
    def pipeline_directory(self) -> str:
        """Returns path to directory where kubeflow pipelines files are stored.

        Returns:
            Path to the pipeline directory.
        """
        return os.path.join(self.root_directory, "pipelines")

    def prepare_pipeline_deployment(
        self,
        pipeline: "BasePipeline",
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> None:
        """Build a Docker image for the current environment.

        This uploads it to a container registry if configured.

        Args:
            pipeline: The pipeline to be deployed.
            stack: The stack that will be used to deploy the pipeline.
            runtime_configuration: The runtime configuration for the pipeline.

        Raises:
            RuntimeError: If the container registry is missing.
        """
        from zenml.utils import docker_utils

        repo = Repository()
        container_registry = repo.active_stack.container_registry

        if not container_registry:
            raise RuntimeError("Missing container registry")

        image_name = self.get_docker_image_name(pipeline.name)

        requirements = {*stack.requirements(), *pipeline.requirements}

        logger.debug(
            "Vertex AI Pipelines service docker container requirements %s",
            requirements,
        )

        docker_utils.build_docker_image(
            build_context_path=get_source_root_path(),
            image_name=image_name,
            dockerignore_path=pipeline.dockerignore_file,
            requirements=requirements,
            base_image=self.custom_docker_base_image_name,
        )
        container_registry.push_image(image_name)

    def prepare_or_run_pipeline(
        self,
        sorted_steps: List["BaseStep"],
        pipeline: "BasePipeline",
        pb2_pipeline: "Pb2Pipeline",
        stack: "Stack",
        runtime_configuration: "RuntimeConfiguration",
    ) -> Any:
        """Creates a KFP JSON pipeline.

        # noqa: DAR402

        This is an intermediary representation of the pipeline which is then
        deployed to Vertex AI Pipelines service.

        How it works:
        -------------
        Before this method is called the `prepare_pipeline_deployment()` method
        builds a Docker image that contains the code for the pipeline, all steps
        the context around these files.

        Based on this Docker image a callable is created which builds
        container_ops for each step (`_construct_kfp_pipeline`). The function
        `kfp.components.load_component_from_text` is used to create the
        `ContainerOp`, because using the `dsl.ContainerOp` class directly is
        deprecated when using the Kubeflow SDK v2. The step entrypoint command
        with the entrypoint arguments is the command that will be executed by
        the container created using the previously created Docker image.

        This callable is then compiled into a JSON file that is used as the
        intermediary representation of the Kubeflow pipeline.

        This file then is submitted to the Vertex AI Pipelines service for
        execution.

        Args:
            sorted_steps: List of sorted steps.
            pipeline: Zenml Pipeline instance.
            pb2_pipeline: Protobuf Pipeline instance.
            stack: The stack the pipeline was run on.
            runtime_configuration: The Runtime configuration of the current run.

        Raises:
            ValueError: If the attribute `pipeline_root` is not set and it
                can be not generated using the path of the artifact store in the
                stack because it is not a
                `zenml.integrations.gcp.artifact_store.GCPArtifactStore`.
        """
        # If the `pipeline_root` has not been defined in the orchestrator
        # configuration,
        # try to create it from the artifact store if it is a
        # `GCPArtifactStore`.
        if not self.pipeline_root:
            artifact_store = stack.artifact_store
            self._pipeline_root = f"{artifact_store.path.rstrip('/')}/vertex_pipeline_root/{pipeline.name}/{runtime_configuration.run_name}"
            logger.info(
                "The attribute `pipeline_root` has not been set in the "
                "orchestrator configuration. One has been generated "
                "automatically based on the path of the `GCPArtifactStore` "
                "artifact store in the stack used to execute the pipeline. "
                "The generated `pipeline_root` is `%s`.",
                self._pipeline_root,
            )
        else:
            self._pipeline_root = self.pipeline_root

        # Build the Docker image that will be used to run the steps of the
        # pipeline.
        image_name = self.get_docker_image_name(pipeline.name)
        image_name = get_image_digest(image_name) or image_name

        def _construct_kfp_pipeline() -> None:
            """Create a `ContainerOp` for each step.

            This should contain the name of the Docker image and configures the
            entrypoint of the Docker image to run the step.

            Additionally, this gives each `ContainerOp` information about its
            direct downstream steps.

            If this callable is passed to the `compile()` method of
            `KFPV2Compiler` all `dsl.ContainerOp` instances will be
            automatically added to a singular `dsl.Pipeline` instance.
            """
            step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}

            for step in sorted_steps:
                # The command will be needed to eventually call the python step
                # within the docker container
                command = VertexEntrypointConfiguration.get_entrypoint_command()

                # The arguments are passed to configure the entrypoint of the
                # docker container when the step is called.
                arguments = VertexEntrypointConfiguration.get_entrypoint_arguments(
                    step=step,
                    pb2_pipeline=pb2_pipeline,
                    **{VERTEX_JOB_ID_OPTION: dslv2.PIPELINE_JOB_ID_PLACEHOLDER},
                )

                # Create the `ContainerOp` for the step. Using the
                # `dsl.ContainerOp`
                # class directly is deprecated when using the Kubeflow SDK v2.
                container_op = kfp.components.load_component_from_text(
                    f"""
                    name: {step.name}
                    implementation:
                        container:
                            image: {image_name}
                            command: {command + arguments}"""
                )()

                # Set upstream tasks as a dependency of the current step
                upstream_step_names = self.get_upstream_step_names(
                    step=step, pb2_pipeline=pb2_pipeline
                )
                for upstream_step_name in upstream_step_names:
                    upstream_container_op = step_name_to_container_op[
                        upstream_step_name
                    ]
                    container_op.after(upstream_container_op)

                step_name_to_container_op[step.name] = container_op

        # Save the generated pipeline to a file.
        assert runtime_configuration.run_name
        fileio.makedirs(self.pipeline_directory)
        pipeline_file_path = os.path.join(
            self.pipeline_directory,
            f"{runtime_configuration.run_name}.json",
        )

        # Compile the pipeline using the Kubeflow SDK V2 compiler that allows
        # to generate a JSON representation of the pipeline that can be later
        # upload to Vertex AI Pipelines service.
        logger.debug(
            "Compiling pipeline using Kubeflow SDK V2 compiler and saving it "
            "to `%s`",
            pipeline_file_path,
        )
        KFPV2Compiler().compile(
            pipeline_func=_construct_kfp_pipeline,
            package_path=pipeline_file_path,
            pipeline_name=_clean_pipeline_name(pipeline.name),
        )

        # Using the Google Cloud AIPlatform client, upload and execute the
        # pipeline
        # on the Vertex AI Pipelines service.
        self._upload_and_run_pipeline(
            pipeline_name=pipeline.name,
            pipeline_file_path=pipeline_file_path,
            runtime_configuration=runtime_configuration,
            enable_cache=pipeline.enable_cache,
        )

    def _upload_and_run_pipeline(
        self,
        pipeline_name: str,
        pipeline_file_path: str,
        runtime_configuration: "RuntimeConfiguration",
        enable_cache: bool,
    ) -> None:
        """Uploads and run the pipeline on the Vertex AI Pipelines service.

        Args:
            pipeline_name: Name of the pipeline.
            pipeline_file_path: Path of the JSON file containing the compiled
                Kubeflow pipeline (compiled with Kubeflow SDK v2).
            runtime_configuration: Runtime configuration of the pipeline run.
            enable_cache: Whether caching is enabled for this pipeline run.
        """
        # We have to replace the hyphens in the pipeline name with underscores
        # and lower case the string, because the Vertex AI Pipelines service
        # requires this format.
        assert runtime_configuration.run_name
        job_id = _clean_pipeline_name(runtime_configuration.run_name)

        # Warn the user that the scheduling is not available using the Vertex
        # Orchestrator
        if runtime_configuration.schedule:
            logger.warning(
                "Pipeline scheduling configuration was provided, but Vertex "
                "AI Pipelines "
                "do not have capabilities for scheduling yet."
            )

        # Get the credentials that would be used to create the Vertex AI
        # Pipelines
        # job.
        credentials, project_id = self._get_authentication()
        if self.project and self.project != project_id:
            logger.warning(
                "Authenticated with project `%s`, but this orchestrator is "
                "configured to use the project `%s`.",
                project_id,
                self.project,
            )

        # If the project was set in the configuration, use it. Otherwise, use
        # the project that was used to authenticate.
        project_id = self.project if self.project else project_id

        # Instantiate the Vertex AI Pipelines job
        run = aiplatform.PipelineJob(
            display_name=pipeline_name,
            template_path=pipeline_file_path,
            job_id=job_id,
            pipeline_root=self._pipeline_root,
            parameter_values=None,
            enable_caching=enable_cache,
            encryption_spec_key_name=self.encryption_spec_key_name,
            labels=self.labels,
            credentials=credentials,
            project=self.project,
            location=self.location,
        )

        logger.info(
            "Submitting pipeline job with job_id `%s` to Vertex AI Pipelines "
            "service.",
            job_id,
        )

        # Submit the job to Vertex AI Pipelines service.
        try:
            if self.workload_service_account:
                logger.info(
                    "The Vertex AI Pipelines job workload will be executed "
                    "using `%s` "
                    "service account.",
                    self.workload_service_account,
                )

            if self.network:
                logger.info(
                    "The Vertex AI Pipelines job will be peered with `%s` "
                    "network.",
                    self.network,
                )

            run.submit(
                service_account=self.workload_service_account,
                network=self.network,
            )
            logger.info(
                "View the Vertex AI Pipelines job at %s", run._dashboard_uri()
            )

            if self.synchronous:
                logger.info(
                    "Waiting for the Vertex AI Pipelines job to finish..."
                )
                run.wait()

        except google_exceptions.ClientError as e:
            logger.warning(
                "Failed to create the Vertex AI Pipelines job: %s", e
            )

        except RuntimeError as e:
            logger.error(
                "The Vertex AI Pipelines job execution has failed: %s", e
            )
pipeline_directory: str property readonly

Returns path to directory where kubeflow pipelines files are stored.

Returns:

Type Description
str

Path to the pipeline directory.

root_directory: str property readonly

Returns path to the root directory for files for this orchestrator.

Returns:

Type Description
str

The path to the root directory for all files concerning this orchestrator.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Validates that the stack contains a container registry.

Also validates that the artifact store and metadata store used are not local.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

get_docker_image_name(self, pipeline_name)

Returns the full docker image name including registry and tag.

Parameters:

Name Type Description Default
pipeline_name str

The name of the pipeline.

required

Returns:

Type Description
str

The full docker image name including registry and tag.

Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
def get_docker_image_name(self, pipeline_name: str) -> str:
    """Returns the full docker image name including registry and tag.

    Args:
        pipeline_name: The name of the pipeline.

    Returns:
        The full docker image name including registry and tag.
    """
    base_image_name = f"zenml-vertex:{pipeline_name}"
    container_registry = Repository().active_stack.container_registry

    if container_registry:
        registry_uri = container_registry.uri.rstrip("/")
        return f"{registry_uri}/{base_image_name}"

    return base_image_name
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)

Creates a KFP JSON pipeline.

noqa: DAR402

This is an intermediary representation of the pipeline which is then deployed to Vertex AI Pipelines service.

How it works:

Before this method is called the prepare_pipeline_deployment() method builds a Docker image that contains the code for the pipeline, all steps the context around these files.

Based on this Docker image a callable is created which builds container_ops for each step (_construct_kfp_pipeline). The function kfp.components.load_component_from_text is used to create the ContainerOp, because using the dsl.ContainerOp class directly is deprecated when using the Kubeflow SDK v2. The step entrypoint command with the entrypoint arguments is the command that will be executed by the container created using the previously created Docker image.

This callable is then compiled into a JSON file that is used as the intermediary representation of the Kubeflow pipeline.

This file then is submitted to the Vertex AI Pipelines service for execution.

Parameters:

Name Type Description Default
sorted_steps List[BaseStep]

List of sorted steps.

required
pipeline BasePipeline

Zenml Pipeline instance.

required
pb2_pipeline Pb2Pipeline

Protobuf Pipeline instance.

required
stack Stack

The stack the pipeline was run on.

required
runtime_configuration RuntimeConfiguration

The Runtime configuration of the current run.

required

Exceptions:

Type Description
ValueError

If the attribute pipeline_root is not set and it can be not generated using the path of the artifact store in the stack because it is not a zenml.integrations.gcp.artifact_store.GCPArtifactStore.

Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
def prepare_or_run_pipeline(
    self,
    sorted_steps: List["BaseStep"],
    pipeline: "BasePipeline",
    pb2_pipeline: "Pb2Pipeline",
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> Any:
    """Creates a KFP JSON pipeline.

    # noqa: DAR402

    This is an intermediary representation of the pipeline which is then
    deployed to Vertex AI Pipelines service.

    How it works:
    -------------
    Before this method is called the `prepare_pipeline_deployment()` method
    builds a Docker image that contains the code for the pipeline, all steps
    the context around these files.

    Based on this Docker image a callable is created which builds
    container_ops for each step (`_construct_kfp_pipeline`). The function
    `kfp.components.load_component_from_text` is used to create the
    `ContainerOp`, because using the `dsl.ContainerOp` class directly is
    deprecated when using the Kubeflow SDK v2. The step entrypoint command
    with the entrypoint arguments is the command that will be executed by
    the container created using the previously created Docker image.

    This callable is then compiled into a JSON file that is used as the
    intermediary representation of the Kubeflow pipeline.

    This file then is submitted to the Vertex AI Pipelines service for
    execution.

    Args:
        sorted_steps: List of sorted steps.
        pipeline: Zenml Pipeline instance.
        pb2_pipeline: Protobuf Pipeline instance.
        stack: The stack the pipeline was run on.
        runtime_configuration: The Runtime configuration of the current run.

    Raises:
        ValueError: If the attribute `pipeline_root` is not set and it
            can be not generated using the path of the artifact store in the
            stack because it is not a
            `zenml.integrations.gcp.artifact_store.GCPArtifactStore`.
    """
    # If the `pipeline_root` has not been defined in the orchestrator
    # configuration,
    # try to create it from the artifact store if it is a
    # `GCPArtifactStore`.
    if not self.pipeline_root:
        artifact_store = stack.artifact_store
        self._pipeline_root = f"{artifact_store.path.rstrip('/')}/vertex_pipeline_root/{pipeline.name}/{runtime_configuration.run_name}"
        logger.info(
            "The attribute `pipeline_root` has not been set in the "
            "orchestrator configuration. One has been generated "
            "automatically based on the path of the `GCPArtifactStore` "
            "artifact store in the stack used to execute the pipeline. "
            "The generated `pipeline_root` is `%s`.",
            self._pipeline_root,
        )
    else:
        self._pipeline_root = self.pipeline_root

    # Build the Docker image that will be used to run the steps of the
    # pipeline.
    image_name = self.get_docker_image_name(pipeline.name)
    image_name = get_image_digest(image_name) or image_name

    def _construct_kfp_pipeline() -> None:
        """Create a `ContainerOp` for each step.

        This should contain the name of the Docker image and configures the
        entrypoint of the Docker image to run the step.

        Additionally, this gives each `ContainerOp` information about its
        direct downstream steps.

        If this callable is passed to the `compile()` method of
        `KFPV2Compiler` all `dsl.ContainerOp` instances will be
        automatically added to a singular `dsl.Pipeline` instance.
        """
        step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}

        for step in sorted_steps:
            # The command will be needed to eventually call the python step
            # within the docker container
            command = VertexEntrypointConfiguration.get_entrypoint_command()

            # The arguments are passed to configure the entrypoint of the
            # docker container when the step is called.
            arguments = VertexEntrypointConfiguration.get_entrypoint_arguments(
                step=step,
                pb2_pipeline=pb2_pipeline,
                **{VERTEX_JOB_ID_OPTION: dslv2.PIPELINE_JOB_ID_PLACEHOLDER},
            )

            # Create the `ContainerOp` for the step. Using the
            # `dsl.ContainerOp`
            # class directly is deprecated when using the Kubeflow SDK v2.
            container_op = kfp.components.load_component_from_text(
                f"""
                name: {step.name}
                implementation:
                    container:
                        image: {image_name}
                        command: {command + arguments}"""
            )()

            # Set upstream tasks as a dependency of the current step
            upstream_step_names = self.get_upstream_step_names(
                step=step, pb2_pipeline=pb2_pipeline
            )
            for upstream_step_name in upstream_step_names:
                upstream_container_op = step_name_to_container_op[
                    upstream_step_name
                ]
                container_op.after(upstream_container_op)

            step_name_to_container_op[step.name] = container_op

    # Save the generated pipeline to a file.
    assert runtime_configuration.run_name
    fileio.makedirs(self.pipeline_directory)
    pipeline_file_path = os.path.join(
        self.pipeline_directory,
        f"{runtime_configuration.run_name}.json",
    )

    # Compile the pipeline using the Kubeflow SDK V2 compiler that allows
    # to generate a JSON representation of the pipeline that can be later
    # upload to Vertex AI Pipelines service.
    logger.debug(
        "Compiling pipeline using Kubeflow SDK V2 compiler and saving it "
        "to `%s`",
        pipeline_file_path,
    )
    KFPV2Compiler().compile(
        pipeline_func=_construct_kfp_pipeline,
        package_path=pipeline_file_path,
        pipeline_name=_clean_pipeline_name(pipeline.name),
    )

    # Using the Google Cloud AIPlatform client, upload and execute the
    # pipeline
    # on the Vertex AI Pipelines service.
    self._upload_and_run_pipeline(
        pipeline_name=pipeline.name,
        pipeline_file_path=pipeline_file_path,
        runtime_configuration=runtime_configuration,
        enable_cache=pipeline.enable_cache,
    )
prepare_pipeline_deployment(self, pipeline, stack, runtime_configuration)

Build a Docker image for the current environment.

This uploads it to a container registry if configured.

Parameters:

Name Type Description Default
pipeline BasePipeline

The pipeline to be deployed.

required
stack Stack

The stack that will be used to deploy the pipeline.

required
runtime_configuration RuntimeConfiguration

The runtime configuration for the pipeline.

required

Exceptions:

Type Description
RuntimeError

If the container registry is missing.

Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
def prepare_pipeline_deployment(
    self,
    pipeline: "BasePipeline",
    stack: "Stack",
    runtime_configuration: "RuntimeConfiguration",
) -> None:
    """Build a Docker image for the current environment.

    This uploads it to a container registry if configured.

    Args:
        pipeline: The pipeline to be deployed.
        stack: The stack that will be used to deploy the pipeline.
        runtime_configuration: The runtime configuration for the pipeline.

    Raises:
        RuntimeError: If the container registry is missing.
    """
    from zenml.utils import docker_utils

    repo = Repository()
    container_registry = repo.active_stack.container_registry

    if not container_registry:
        raise RuntimeError("Missing container registry")

    image_name = self.get_docker_image_name(pipeline.name)

    requirements = {*stack.requirements(), *pipeline.requirements}

    logger.debug(
        "Vertex AI Pipelines service docker container requirements %s",
        requirements,
    )

    docker_utils.build_docker_image(
        build_context_path=get_source_root_path(),
        image_name=image_name,
        dockerignore_path=pipeline.dockerignore_file,
        requirements=requirements,
        base_image=self.custom_docker_base_image_name,
    )
    container_registry.push_image(image_name)

secrets_manager special

ZenML integration for GCP Secrets Manager.

The GCP Secrets Manager allows your pipeline to directly access the GCP secrets manager and use the secrets within during runtime.

gcp_secrets_manager

Implementation of the GCP Secrets Manager.

GCPSecretsManager (BaseSecretsManager) pydantic-model

Class to interact with the GCP secrets manager.

Attributes:

Name Type Description
project_id str

This is necessary to access the correct GCP project. The project_id of your GCP project space that contains the Secret Manager.

Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
class GCPSecretsManager(BaseSecretsManager):
    """Class to interact with the GCP secrets manager.

    Attributes:
        project_id:  This is necessary to access the correct GCP project.
                     The project_id of your GCP project space that contains
                     the Secret Manager.
    """

    project_id: str

    # Class configuration
    FLAVOR: ClassVar[str] = GCP_SECRETS_MANAGER_FLAVOR
    CLIENT: ClassVar[Any] = None

    @classmethod
    def _ensure_client_connected(cls) -> None:
        if cls.CLIENT is None:
            cls.CLIENT = secretmanager.SecretManagerServiceClient()

    @property
    def parent_name(self) -> str:
        """Construct the GCP parent path to the secret manager.

        Returns:
            The parent path to the secret manager
        """
        return f"projects/{self.project_id}"

    def register_secret(self, secret: BaseSecretSchema) -> None:
        """Registers a new secret.

        Args:
            secret: the secret to register

        Raises:
            SecretExistsError: if the secret already exists
        """
        self._ensure_client_connected()

        if secret.name in self.get_all_secret_keys():
            raise SecretExistsError(
                f"A Secret with the name {secret.name} already exists."
            )

        adjusted_content = prepend_group_name_to_keys(secret)
        for k, v in adjusted_content.items():
            # Create the secret, this only creates an empty secret with the
            #  supplied name.
            gcp_secret = self.CLIENT.create_secret(
                request={
                    "parent": self.parent_name,
                    "secret_id": k,
                    "secret": {
                        "replication": {"automatic": {}},
                        "labels": [
                            (ZENML_GROUP_KEY, secret.name),
                            (ZENML_SCHEMA_NAME, secret.TYPE),
                        ],
                    },
                }
            )

            logger.debug("Created empty secret: %s", gcp_secret.name)

            self.CLIENT.add_secret_version(
                request={
                    "parent": gcp_secret.name,
                    "payload": {"data": str(v).encode()},
                }
            )

            logger.debug("Added value to secret.")

    def get_secret(self, secret_name: str) -> BaseSecretSchema:
        """Get a secret by its name.

        Args:
            secret_name: the name of the secret to get

        Returns:
            The secret.

        Raises:
            RuntimeError: if the secret does not exist
        """
        self._ensure_client_connected()

        secret_contents = {}
        zenml_schema_name = ""

        # List all secrets.
        for secret in self.CLIENT.list_secrets(
            request={"parent": self.parent_name}
        ):
            if (
                ZENML_GROUP_KEY in secret.labels
                and secret_name == secret.labels[ZENML_GROUP_KEY]
            ):

                secret_version_name = secret.name + "/versions/latest"

                response = self.CLIENT.access_secret_version(
                    request={"name": secret_version_name}
                )

                secret_value = response.payload.data.decode("UTF-8")

                secret_key = remove_group_name_from_key(
                    secret.name.split("/")[-1], secret_name
                )

                secret_contents[secret_key] = secret_value

                zenml_schema_name = secret.labels[ZENML_SCHEMA_NAME]

        if not secret_contents:
            raise RuntimeError(f"No secrets found within the {secret_name}")

        secret_contents["name"] = secret_name

        secret_schema = SecretSchemaClassRegistry.get_class(
            secret_schema=zenml_schema_name
        )
        return secret_schema(**secret_contents)

    def get_all_secret_keys(self) -> List[str]:
        """Get all secret keys.

        Returns:
            A list of all secret keys
        """
        self._ensure_client_connected()

        set_of_secrets = set()

        # List all secrets.
        for secret in self.CLIENT.list_secrets(
            request={"parent": self.parent_name}
        ):
            if ZENML_GROUP_KEY in secret.labels:
                group_key = secret.labels[ZENML_GROUP_KEY]
                set_of_secrets.add(group_key)

        return list(set_of_secrets)

    def update_secret(self, secret: BaseSecretSchema) -> None:
        """Update an existing secret by creating new versions of the existing secrets.

        Args:
            secret: the secret to update
        """
        self._ensure_client_connected()

        adjusted_content = prepend_group_name_to_keys(secret)

        for k, v in adjusted_content.items():
            # Create the secret, this only creates an empty secret with the
            #  supplied name.
            version_parent = self.CLIENT.secret_path(self.project_id, k)
            payload = {"data": str(v).encode()}

            self.CLIENT.add_secret_version(
                request={"parent": version_parent, "payload": payload}
            )

    def delete_secret(self, secret_name: str) -> None:
        """Delete an existing secret by name.

        In GCP a secret is a single k-v
        pair. Within ZenML a secret is a collection of k-v pairs. As such,
        deleting a secret will iterate through all secrets and delete the ones
        with the secret_name as label.

        Args:
            secret_name: the name of the secret to delete
        """
        self._ensure_client_connected()

        # Go through all gcp secrets and delete the ones with the secret_name
        #  as label.
        for secret in self.CLIENT.list_secrets(
            request={"parent": self.parent_name}
        ):
            if (
                ZENML_GROUP_KEY in secret.labels
                and secret_name == secret.labels[ZENML_GROUP_KEY]
            ):
                self.CLIENT.delete_secret(request={"name": secret.name})

    def delete_all_secrets(self) -> None:
        """Delete all existing secrets."""
        self._ensure_client_connected()

        # List al