Integrations
zenml.integrations
special
ZenML integrations module.
The ZenML integrations module contains sub-modules for each integration that we
support. This includes orchestrators like Apache Airflow, visualization tools
like the facets
library, as well as deep learning libraries like PyTorch.
airflow
special
Airflow integration for ZenML.
The Airflow integration sub-module powers an alternative to the local
orchestrator. You can enable it by registering the Airflow orchestrator with
the CLI tool, then bootstrap using the zenml orchestrator up
command.
AirflowIntegration (Integration)
Definition of Airflow Integration for ZenML.
Source code in zenml/integrations/airflow/__init__.py
class AirflowIntegration(Integration):
"""Definition of Airflow Integration for ZenML."""
NAME = AIRFLOW
REQUIREMENTS = ["apache-airflow==2.2.0"]
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declare the stack component flavors for the Airflow integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=AIRFLOW_ORCHESTRATOR_FLAVOR,
source="zenml.integrations.airflow.orchestrators.AirflowOrchestrator",
type=StackComponentType.ORCHESTRATOR,
integration=cls.NAME,
)
]
flavors()
classmethod
Declare the stack component flavors for the Airflow integration.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/airflow/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declare the stack component flavors for the Airflow integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=AIRFLOW_ORCHESTRATOR_FLAVOR,
source="zenml.integrations.airflow.orchestrators.AirflowOrchestrator",
type=StackComponentType.ORCHESTRATOR,
integration=cls.NAME,
)
]
orchestrators
special
The Airflow integration enables the use of Airflow as a pipeline orchestrator.
airflow_orchestrator
Implementation of Airflow orchestrator integration.
AirflowOrchestrator (BaseOrchestrator)
pydantic-model
Orchestrator responsible for running pipelines using Airflow.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class AirflowOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines using Airflow."""
airflow_home: str = ""
# Class Configuration
FLAVOR: ClassVar[str] = AIRFLOW_ORCHESTRATOR_FLAVOR
def __init__(self, **values: Any):
"""Sets environment variables to configure airflow.
Args:
**values: Values to set in the orchestrator.
"""
super().__init__(**values)
self._set_env()
@staticmethod
def _translate_schedule(
schedule: Optional[Schedule] = None,
) -> Dict[str, Any]:
"""Convert ZenML schedule into Airflow schedule.
The Airflow schedule uses slightly different naming and needs some
default entries for execution without a schedule.
Args:
schedule: Containing the interval, start and end date and
a boolean flag that defines if past runs should be caught up
on
Returns:
Airflow configuration dict.
"""
if schedule:
if schedule.cron_expression:
return {
"schedule_interval": schedule.cron_expression,
}
else:
return {
"schedule_interval": schedule.interval_second,
"start_date": schedule.start_time,
"end_date": schedule.end_time,
"catchup": schedule.catchup,
}
return {
"schedule_interval": "@once",
# set the a start time in the past and disable catchup so airflow runs the dag immediately
"start_date": datetime.datetime.now() - datetime.timedelta(7),
"catchup": False,
}
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Creates an Airflow DAG as the intermediate representation for the pipeline.
This DAG will be loaded by airflow in the target environment
and used for orchestration of the pipeline.
How it works:
-------------
A new airflow_dag is instantiated with the pipeline name and among
others things the run schedule.
For each step of the pipeline a callable is created. This callable
uses the run_step() method to execute the step. The parameters of
this callable are pre-filled and an airflow step_operator is created
within the dag. The dependencies to upstream steps are then
configured.
Finally, the dag is fully complete and can be returned.
Args:
sorted_steps: List of steps in the pipeline.
pipeline: The pipeline to be executed.
pb2_pipeline: The pipeline as a protobuf message.
stack: The stack on which the pipeline will be deployed.
runtime_configuration: The runtime configuration.
Returns:
The Airflow DAG.
"""
import airflow
from airflow.operators import python as airflow_python
# Instantiate and configure airflow Dag with name and schedule
airflow_dag = airflow.DAG(
dag_id=pipeline.name,
is_paused_upon_creation=False,
**self._translate_schedule(runtime_configuration.schedule),
)
# Dictionary mapping step names to airflow_operators. This will be needed
# to configure airflow operator dependencies
step_name_to_airflow_operator = {}
for step in sorted_steps:
# Create callable that will be used by airflow to execute the step
# within the orchestrated environment
def _step_callable(step_instance: "BaseStep", **kwargs):
# Extract run name for the kwargs that will be passed to the
# callable
run_name = kwargs["ti"].get_dagrun().run_id
self.run_step(
step=step_instance,
run_name=run_name,
pb2_pipeline=pb2_pipeline,
)
# Create airflow python operator that contains the step callable
airflow_operator = airflow_python.PythonOperator(
dag=airflow_dag,
task_id=step.name,
provide_context=True,
python_callable=functools.partial(
_step_callable, step_instance=step
),
)
# Configure the current airflow operator to run after all upstream
# operators finished executing
step_name_to_airflow_operator[step.name] = airflow_operator
upstream_step_names = self.get_upstream_step_names(
step=step, pb2_pipeline=pb2_pipeline
)
for upstream_step_name in upstream_step_names:
airflow_operator.set_upstream(
step_name_to_airflow_operator[upstream_step_name]
)
# Return the finished airflow dag
return airflow_dag
@root_validator(skip_on_failure=True)
def set_airflow_home(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Sets Airflow home according to orchestrator UUID.
Args:
values: Dictionary containing all orchestrator attributes values.
Returns:
Dictionary containing all orchestrator attributes values and the airflow home.
Raises:
ValueError: If the orchestrator UUID is not set.
"""
if "uuid" not in values:
raise ValueError("`uuid` needs to exist for AirflowOrchestrator.")
values["airflow_home"] = os.path.join(
io_utils.get_global_config_directory(),
AIRFLOW_ROOT_DIR,
str(values["uuid"]),
)
return values
@property
def dags_directory(self) -> str:
"""Returns path to the airflow dags directory.
Returns:
Path to the airflow dags directory.
"""
return os.path.join(self.airflow_home, "dags")
@property
def pid_file(self) -> str:
"""Returns path to the daemon PID file.
Returns:
Path to the daemon PID file.
"""
return os.path.join(self.airflow_home, "airflow_daemon.pid")
@property
def log_file(self) -> str:
"""Returns path to the airflow log file.
Returns:
str: Path to the airflow log file.
"""
return os.path.join(self.airflow_home, "airflow_orchestrator.log")
@property
def password_file(self) -> str:
"""Returns path to the webserver password file.
Returns:
Path to the webserver password file.
"""
return os.path.join(self.airflow_home, "standalone_admin_password.txt")
def _set_env(self) -> None:
"""Sets environment variables to configure airflow."""
os.environ["AIRFLOW_HOME"] = self.airflow_home
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = self.dags_directory
os.environ["AIRFLOW__CORE__DAG_DISCOVERY_SAFE_MODE"] = "false"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "false"
# check the DAG folder every 10 seconds for new files
os.environ["AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL"] = "10"
def _copy_to_dag_directory_if_necessary(self, dag_filepath: str) -> None:
"""Copies DAG module to the Airflow DAGs directory if not already present.
Args:
dag_filepath: Path to the file in which the DAG is defined.
"""
dags_directory = io_utils.resolve_relative_path(self.dags_directory)
if dags_directory == os.path.dirname(dag_filepath):
logger.debug("File is already in airflow DAGs directory.")
else:
logger.debug(
"Copying dag file '%s' to DAGs directory.", dag_filepath
)
destination_path = os.path.join(
dags_directory, os.path.basename(dag_filepath)
)
if fileio.exists(destination_path):
logger.info(
"File '%s' already exists, overwriting with new DAG file",
destination_path,
)
fileio.copy(dag_filepath, destination_path, overwrite=True)
def _log_webserver_credentials(self) -> None:
"""Logs URL and credentials to log in to the airflow webserver.
Raises:
FileNotFoundError: If the password file does not exist.
"""
if fileio.exists(self.password_file):
with open(self.password_file) as file:
password = file.read().strip()
else:
raise FileNotFoundError(
f"Can't find password file '{self.password_file}'"
)
logger.info(
"To inspect your DAGs, login to http://0.0.0.0:8080 "
"with username: admin password: %s",
password,
)
def runtime_options(self) -> Dict[str, Any]:
"""Runtime options for the airflow orchestrator.
Returns:
Runtime options dictionary.
"""
return {DAG_FILEPATH_OPTION_KEY: None}
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Checks Airflow is running and copies DAG file to the Airflow DAGs directory.
Args:
pipeline: Pipeline to be deployed.
stack: Stack to be deployed.
runtime_configuration: Runtime configuration for the pipeline.
Raises:
RuntimeError: If Airflow is not running or no DAG filepath runtime
option is provided.
"""
if not self.is_running:
raise RuntimeError(
"Airflow orchestrator is currently not running. Run `zenml "
"stack up` to provision resources for the active stack."
)
try:
dag_filepath = runtime_configuration[DAG_FILEPATH_OPTION_KEY]
except KeyError:
raise RuntimeError(
f"No DAG filepath found in runtime configuration. Make sure "
f"to add the filepath to your airflow DAG file as a runtime "
f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
)
self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)
@property
def is_running(self) -> bool:
"""Returns whether the airflow daemon is currently running.
Returns:
True if the daemon is running, False otherwise.
Raises:
RuntimeError: If port 8080 is occupied.
"""
from airflow.cli.commands.standalone_command import StandaloneCommand
from airflow.jobs.triggerer_job import TriggererJob
daemon_running = daemon.check_if_daemon_is_running(self.pid_file)
command = StandaloneCommand()
webserver_port_open = command.port_open(8080)
if not daemon_running:
if webserver_port_open:
raise RuntimeError(
"The airflow daemon does not seem to be running but "
"local port 8080 is occupied. Make sure the port is "
"available and try again."
)
# exit early so we don't check non-existing airflow databases
return False
# we can't use StandaloneCommand().is_ready() here as the
# Airflow SequentialExecutor apparently does not send a heartbeat
# while running a task which would result in this returning `False`
# even if Airflow is running.
airflow_running = webserver_port_open and command.job_running(
TriggererJob
)
return airflow_running
@property
def is_provisioned(self) -> bool:
"""Returns whether the airflow daemon is currently running.
Returns:
True if the airflow daemon is running, False otherwise.
"""
return self.is_running
def provision(self) -> None:
"""Ensures that Airflow is running."""
if self.is_running:
logger.info("Airflow is already running.")
self._log_webserver_credentials()
return
if not fileio.exists(self.dags_directory):
io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
from airflow.cli.commands.standalone_command import StandaloneCommand
try:
command = StandaloneCommand()
# Run the daemon with a working directory inside the current
# zenml repo so the same repo will be used to run the DAGs
daemon.run_as_daemon(
command.run,
pid_file=self.pid_file,
log_file=self.log_file,
working_directory=get_source_root_path(),
)
while not self.is_running:
# Wait until the daemon started all the relevant airflow
# processes
time.sleep(0.1)
self._log_webserver_credentials()
except Exception as e:
logger.error(e)
logger.error(
"An error occurred while starting the Airflow daemon. If you "
"want to start it manually, use the commands described in the "
"official Airflow quickstart guide for running Airflow locally."
)
self.deprovision()
def deprovision(self) -> None:
"""Stops the airflow daemon if necessary and tears down resources."""
if self.is_running:
daemon.stop_daemon(self.pid_file)
fileio.rmtree(self.airflow_home)
logger.info("Airflow spun down.")
dags_directory: str
property
readonly
Returns path to the airflow dags directory.
Returns:
Type | Description |
---|---|
str |
Path to the airflow dags directory. |
is_provisioned: bool
property
readonly
Returns whether the airflow daemon is currently running.
Returns:
Type | Description |
---|---|
bool |
True if the airflow daemon is running, False otherwise. |
is_running: bool
property
readonly
Returns whether the airflow daemon is currently running.
Returns:
Type | Description |
---|---|
bool |
True if the daemon is running, False otherwise. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If port 8080 is occupied. |
log_file: str
property
readonly
Returns path to the airflow log file.
Returns:
Type | Description |
---|---|
str |
Path to the airflow log file. |
password_file: str
property
readonly
Returns path to the webserver password file.
Returns:
Type | Description |
---|---|
str |
Path to the webserver password file. |
pid_file: str
property
readonly
Returns path to the daemon PID file.
Returns:
Type | Description |
---|---|
str |
Path to the daemon PID file. |
__init__(self, **values)
special
Sets environment variables to configure airflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**values |
Any |
Values to set in the orchestrator. |
{} |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __init__(self, **values: Any):
"""Sets environment variables to configure airflow.
Args:
**values: Values to set in the orchestrator.
"""
super().__init__(**values)
self._set_env()
deprovision(self)
Stops the airflow daemon if necessary and tears down resources.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def deprovision(self) -> None:
"""Stops the airflow daemon if necessary and tears down resources."""
if self.is_running:
daemon.stop_daemon(self.pid_file)
fileio.rmtree(self.airflow_home)
logger.info("Airflow spun down.")
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)
Creates an Airflow DAG as the intermediate representation for the pipeline.
This DAG will be loaded by airflow in the target environment and used for orchestration of the pipeline.
How it works:
A new airflow_dag is instantiated with the pipeline name and among others things the run schedule.
For each step of the pipeline a callable is created. This callable uses the run_step() method to execute the step. The parameters of this callable are pre-filled and an airflow step_operator is created within the dag. The dependencies to upstream steps are then configured.
Finally, the dag is fully complete and can be returned.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sorted_steps |
List[zenml.steps.base_step.BaseStep] |
List of steps in the pipeline. |
required |
pipeline |
BasePipeline |
The pipeline to be executed. |
required |
pb2_pipeline |
Pipeline |
The pipeline as a protobuf message. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
runtime_configuration |
RuntimeConfiguration |
The runtime configuration. |
required |
Returns:
Type | Description |
---|---|
Any |
The Airflow DAG. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_or_run_pipeline(
self,
sorted_steps: List[BaseStep],
pipeline: "BasePipeline",
pb2_pipeline: Pb2Pipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Creates an Airflow DAG as the intermediate representation for the pipeline.
This DAG will be loaded by airflow in the target environment
and used for orchestration of the pipeline.
How it works:
-------------
A new airflow_dag is instantiated with the pipeline name and among
others things the run schedule.
For each step of the pipeline a callable is created. This callable
uses the run_step() method to execute the step. The parameters of
this callable are pre-filled and an airflow step_operator is created
within the dag. The dependencies to upstream steps are then
configured.
Finally, the dag is fully complete and can be returned.
Args:
sorted_steps: List of steps in the pipeline.
pipeline: The pipeline to be executed.
pb2_pipeline: The pipeline as a protobuf message.
stack: The stack on which the pipeline will be deployed.
runtime_configuration: The runtime configuration.
Returns:
The Airflow DAG.
"""
import airflow
from airflow.operators import python as airflow_python
# Instantiate and configure airflow Dag with name and schedule
airflow_dag = airflow.DAG(
dag_id=pipeline.name,
is_paused_upon_creation=False,
**self._translate_schedule(runtime_configuration.schedule),
)
# Dictionary mapping step names to airflow_operators. This will be needed
# to configure airflow operator dependencies
step_name_to_airflow_operator = {}
for step in sorted_steps:
# Create callable that will be used by airflow to execute the step
# within the orchestrated environment
def _step_callable(step_instance: "BaseStep", **kwargs):
# Extract run name for the kwargs that will be passed to the
# callable
run_name = kwargs["ti"].get_dagrun().run_id
self.run_step(
step=step_instance,
run_name=run_name,
pb2_pipeline=pb2_pipeline,
)
# Create airflow python operator that contains the step callable
airflow_operator = airflow_python.PythonOperator(
dag=airflow_dag,
task_id=step.name,
provide_context=True,
python_callable=functools.partial(
_step_callable, step_instance=step
),
)
# Configure the current airflow operator to run after all upstream
# operators finished executing
step_name_to_airflow_operator[step.name] = airflow_operator
upstream_step_names = self.get_upstream_step_names(
step=step, pb2_pipeline=pb2_pipeline
)
for upstream_step_name in upstream_step_names:
airflow_operator.set_upstream(
step_name_to_airflow_operator[upstream_step_name]
)
# Return the finished airflow dag
return airflow_dag
prepare_pipeline_deployment(self, pipeline, stack, runtime_configuration)
Checks Airflow is running and copies DAG file to the Airflow DAGs directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
Pipeline to be deployed. |
required |
stack |
Stack |
Stack to be deployed. |
required |
runtime_configuration |
RuntimeConfiguration |
Runtime configuration for the pipeline. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If Airflow is not running or no DAG filepath runtime option is provided. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Checks Airflow is running and copies DAG file to the Airflow DAGs directory.
Args:
pipeline: Pipeline to be deployed.
stack: Stack to be deployed.
runtime_configuration: Runtime configuration for the pipeline.
Raises:
RuntimeError: If Airflow is not running or no DAG filepath runtime
option is provided.
"""
if not self.is_running:
raise RuntimeError(
"Airflow orchestrator is currently not running. Run `zenml "
"stack up` to provision resources for the active stack."
)
try:
dag_filepath = runtime_configuration[DAG_FILEPATH_OPTION_KEY]
except KeyError:
raise RuntimeError(
f"No DAG filepath found in runtime configuration. Make sure "
f"to add the filepath to your airflow DAG file as a runtime "
f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
)
self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)
provision(self)
Ensures that Airflow is running.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def provision(self) -> None:
"""Ensures that Airflow is running."""
if self.is_running:
logger.info("Airflow is already running.")
self._log_webserver_credentials()
return
if not fileio.exists(self.dags_directory):
io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
from airflow.cli.commands.standalone_command import StandaloneCommand
try:
command = StandaloneCommand()
# Run the daemon with a working directory inside the current
# zenml repo so the same repo will be used to run the DAGs
daemon.run_as_daemon(
command.run,
pid_file=self.pid_file,
log_file=self.log_file,
working_directory=get_source_root_path(),
)
while not self.is_running:
# Wait until the daemon started all the relevant airflow
# processes
time.sleep(0.1)
self._log_webserver_credentials()
except Exception as e:
logger.error(e)
logger.error(
"An error occurred while starting the Airflow daemon. If you "
"want to start it manually, use the commands described in the "
"official Airflow quickstart guide for running Airflow locally."
)
self.deprovision()
runtime_options(self)
Runtime options for the airflow orchestrator.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Runtime options dictionary. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def runtime_options(self) -> Dict[str, Any]:
"""Runtime options for the airflow orchestrator.
Returns:
Runtime options dictionary.
"""
return {DAG_FILEPATH_OPTION_KEY: None}
set_airflow_home(values)
classmethod
Sets Airflow home according to orchestrator UUID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
Dictionary containing all orchestrator attributes values. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Dictionary containing all orchestrator attributes values and the airflow home. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the orchestrator UUID is not set. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
@root_validator(skip_on_failure=True)
def set_airflow_home(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Sets Airflow home according to orchestrator UUID.
Args:
values: Dictionary containing all orchestrator attributes values.
Returns:
Dictionary containing all orchestrator attributes values and the airflow home.
Raises:
ValueError: If the orchestrator UUID is not set.
"""
if "uuid" not in values:
raise ValueError("`uuid` needs to exist for AirflowOrchestrator.")
values["airflow_home"] = os.path.join(
io_utils.get_global_config_directory(),
AIRFLOW_ROOT_DIR,
str(values["uuid"]),
)
return values
aws
special
Integrates multiple AWS Tools as Stack Components.
The AWS integration provides a way for our users to manage their secrets through AWS, a way to use the aws container registry. Additionally, the Sagemaker integration submodule provides a way to run ZenML steps in Sagemaker.
AWSIntegration (Integration)
Definition of AWS integration for ZenML.
Source code in zenml/integrations/aws/__init__.py
class AWSIntegration(Integration):
"""Definition of AWS integration for ZenML."""
NAME = AWS
REQUIREMENTS = ["boto3==1.21.21", "sagemaker==2.82.2"]
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declare the stack component flavors for the AWS integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=AWS_SECRET_MANAGER_FLAVOR,
source="zenml.integrations.aws.secrets_managers"
".AWSSecretsManager",
type=StackComponentType.SECRETS_MANAGER,
integration=cls.NAME,
),
FlavorWrapper(
name=AWS_CONTAINER_REGISTRY_FLAVOR,
source="zenml.integrations.aws.container_registries"
".AWSContainerRegistry",
type=StackComponentType.CONTAINER_REGISTRY,
integration=cls.NAME,
),
FlavorWrapper(
name=AWS_SAGEMAKER_STEP_OPERATOR_FLAVOR,
source="zenml.integrations.aws.step_operators"
".SagemakerStepOperator",
type=StackComponentType.STEP_OPERATOR,
integration=cls.NAME,
),
]
flavors()
classmethod
Declare the stack component flavors for the AWS integration.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/aws/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declare the stack component flavors for the AWS integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=AWS_SECRET_MANAGER_FLAVOR,
source="zenml.integrations.aws.secrets_managers"
".AWSSecretsManager",
type=StackComponentType.SECRETS_MANAGER,
integration=cls.NAME,
),
FlavorWrapper(
name=AWS_CONTAINER_REGISTRY_FLAVOR,
source="zenml.integrations.aws.container_registries"
".AWSContainerRegistry",
type=StackComponentType.CONTAINER_REGISTRY,
integration=cls.NAME,
),
FlavorWrapper(
name=AWS_SAGEMAKER_STEP_OPERATOR_FLAVOR,
source="zenml.integrations.aws.step_operators"
".SagemakerStepOperator",
type=StackComponentType.STEP_OPERATOR,
integration=cls.NAME,
),
]
container_registries
special
Initialization of AWS Container Registry integration.
aws_container_registry
Implementation of the AWS container registry integration.
AWSContainerRegistry (BaseContainerRegistry)
pydantic-model
Class for AWS Container Registry.
Source code in zenml/integrations/aws/container_registries/aws_container_registry.py
class AWSContainerRegistry(BaseContainerRegistry):
"""Class for AWS Container Registry."""
# Class Configuration
FLAVOR: ClassVar[str] = AWS_CONTAINER_REGISTRY_FLAVOR
@validator("uri")
def validate_aws_uri(cls, uri: str) -> str:
"""Validates that the URI is in the correct format.
Args:
uri: URI to validate.
Returns:
URI in the correct format.
Raises:
ValueError: If the URI contains a slash character.
"""
if "/" in uri:
raise ValueError(
"Property `uri` can not contain a `/`. An example of a valid "
"URI is: `715803424592.dkr.ecr.us-east-1.amazonaws.com`"
)
return uri
def prepare_image_push(self, image_name: str) -> None:
"""Logs warning message if trying to push an image for which no repository exists.
Args:
image_name: Name of the docker image that will be pushed.
Raises:
ValueError: If the docker image name is invalid.
"""
response = boto3.client("ecr").describe_repositories()
try:
repo_uris: List[str] = [
repository["repositoryUri"]
for repository in response["repositories"]
]
except (KeyError, ClientError) as e:
# invalid boto response, let's hope for the best and just push
logger.debug("Error while trying to fetch ECR repositories: %s", e)
return
repo_exists = any(image_name.startswith(f"{uri}:") for uri in repo_uris)
if not repo_exists:
match = re.search(f"{self.uri}/(.*):.*", image_name)
if not match:
raise ValueError(f"Invalid docker image name '{image_name}'.")
repo_name = match.group(1)
logger.warning(
"Amazon ECR requires you to create a repository before you can "
f"push an image to it. ZenML is trying to push the image "
f"{image_name} but could only detect the following "
f"repositories: {repo_uris}. We will try to push anyway, but "
f"in case it fails you need to create a repository named "
f"`{repo_name}`."
)
@property
def post_registration_message(self) -> Optional[str]:
"""Optional message printed after the stack component is registered.
Returns:
Info message regarding docker repositories in AWS.
"""
return (
"Amazon ECR requires you to create a repository before you can "
"push an image to it. If you want to for example run a pipeline "
"using our Kubeflow orchestrator, ZenML will automatically build a "
f"docker image called `{self.uri}/zenml-kubeflow:<PIPELINE_NAME>` "
f"and try to push it. This will fail unless you create the "
f"repository `zenml-kubeflow` inside your amazon registry."
)
post_registration_message: Optional[str]
property
readonly
Optional message printed after the stack component is registered.
Returns:
Type | Description |
---|---|
Optional[str] |
Info message regarding docker repositories in AWS. |
prepare_image_push(self, image_name)
Logs warning message if trying to push an image for which no repository exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image_name |
str |
Name of the docker image that will be pushed. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the docker image name is invalid. |
Source code in zenml/integrations/aws/container_registries/aws_container_registry.py
def prepare_image_push(self, image_name: str) -> None:
"""Logs warning message if trying to push an image for which no repository exists.
Args:
image_name: Name of the docker image that will be pushed.
Raises:
ValueError: If the docker image name is invalid.
"""
response = boto3.client("ecr").describe_repositories()
try:
repo_uris: List[str] = [
repository["repositoryUri"]
for repository in response["repositories"]
]
except (KeyError, ClientError) as e:
# invalid boto response, let's hope for the best and just push
logger.debug("Error while trying to fetch ECR repositories: %s", e)
return
repo_exists = any(image_name.startswith(f"{uri}:") for uri in repo_uris)
if not repo_exists:
match = re.search(f"{self.uri}/(.*):.*", image_name)
if not match:
raise ValueError(f"Invalid docker image name '{image_name}'.")
repo_name = match.group(1)
logger.warning(
"Amazon ECR requires you to create a repository before you can "
f"push an image to it. ZenML is trying to push the image "
f"{image_name} but could only detect the following "
f"repositories: {repo_uris}. We will try to push anyway, but "
f"in case it fails you need to create a repository named "
f"`{repo_name}`."
)
validate_aws_uri(uri)
classmethod
Validates that the URI is in the correct format.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uri |
str |
URI to validate. |
required |
Returns:
Type | Description |
---|---|
str |
URI in the correct format. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the URI contains a slash character. |
Source code in zenml/integrations/aws/container_registries/aws_container_registry.py
@validator("uri")
def validate_aws_uri(cls, uri: str) -> str:
"""Validates that the URI is in the correct format.
Args:
uri: URI to validate.
Returns:
URI in the correct format.
Raises:
ValueError: If the URI contains a slash character.
"""
if "/" in uri:
raise ValueError(
"Property `uri` can not contain a `/`. An example of a valid "
"URI is: `715803424592.dkr.ecr.us-east-1.amazonaws.com`"
)
return uri
secrets_managers
special
AWS Secrets Manager.
aws_secrets_manager
Implementation of the AWS Secrets Manager integration.
AWSSecretsManager (BaseSecretsManager)
pydantic-model
Class to interact with the AWS secrets manager.
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
class AWSSecretsManager(BaseSecretsManager):
"""Class to interact with the AWS secrets manager."""
region_name: str = DEFAULT_AWS_REGION
# Class configuration
FLAVOR: ClassVar[str] = AWS_SECRET_MANAGER_FLAVOR
CLIENT: ClassVar[Any] = None
@classmethod
def _ensure_client_connected(cls, region_name: str) -> None:
"""Ensure that the client is connected to the AWS secrets manager.
Args:
region_name: the AWS region name
"""
if cls.CLIENT is None:
# Create a Secrets Manager client
session = boto3.session.Session()
cls.CLIENT = session.client(
service_name="secretsmanager", region_name=region_name
)
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: the secret to register
Raises:
SecretExistsError: if the secret already exists
"""
self._ensure_client_connected(self.region_name)
secret_value = jsonify_secret_contents(secret)
if secret.name in self.get_all_secret_keys():
raise SecretExistsError(
f"A Secret with the name {secret.name} already exists"
)
kwargs = {"Name": secret.name, "SecretString": secret_value}
self.CLIENT.create_secret(**kwargs)
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets a secret.
Args:
secret_name: the name of the secret to get
Returns:
The secret.
Raises:
RuntimeError: if the secret does not exist
"""
self._ensure_client_connected(self.region_name)
get_secret_value_response = self.CLIENT.get_secret_value(
SecretId=secret_name
)
if "SecretString" not in get_secret_value_response:
raise RuntimeError(f"No secrets found within the {secret_name}")
secret_contents: Dict[str, str] = json.loads(
get_secret_value_response["SecretString"]
)
zenml_schema_name = secret_contents.pop(ZENML_SCHEMA_NAME)
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**secret_contents)
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys
"""
self._ensure_client_connected(self.region_name)
# TODO [ENG-720]: Deal with pagination in the aws secret manager when
# listing all secrets
# TODO [ENG-721]: take out this magic maxresults number
response = self.CLIENT.list_secrets(MaxResults=100)
return [secret["Name"] for secret in response["SecretList"]]
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: the secret to update
"""
self._ensure_client_connected(self.region_name)
secret_value = jsonify_secret_contents(secret)
kwargs = {"SecretId": secret.name, "SecretString": secret_value}
self.CLIENT.put_secret_value(**kwargs)
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: the name of the secret to delete
"""
self._ensure_client_connected(self.region_name)
self.CLIENT.delete_secret(
SecretId=secret_name, ForceDeleteWithoutRecovery=False
)
def delete_all_secrets(self) -> None:
"""Delete all existing secrets.
This method will force delete all your secrets. You will not be able to
recover them once this method is called.
"""
self._ensure_client_connected(self.region_name)
for secret_name in self.get_all_secret_keys():
self.CLIENT.delete_secret(
SecretId=secret_name, ForceDeleteWithoutRecovery=True
)
delete_all_secrets(self)
Delete all existing secrets.
This method will force delete all your secrets. You will not be able to recover them once this method is called.
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def delete_all_secrets(self) -> None:
"""Delete all existing secrets.
This method will force delete all your secrets. You will not be able to
recover them once this method is called.
"""
self._ensure_client_connected(self.region_name)
for secret_name in self.get_all_secret_keys():
self.CLIENT.delete_secret(
SecretId=secret_name, ForceDeleteWithoutRecovery=True
)
delete_secret(self, secret_name)
Delete an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
the name of the secret to delete |
required |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: the name of the secret to delete
"""
self._ensure_client_connected(self.region_name)
self.CLIENT.delete_secret(
SecretId=secret_name, ForceDeleteWithoutRecovery=False
)
get_all_secret_keys(self)
Get all secret keys.
Returns:
Type | Description |
---|---|
List[str] |
A list of all secret keys |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys
"""
self._ensure_client_connected(self.region_name)
# TODO [ENG-720]: Deal with pagination in the aws secret manager when
# listing all secrets
# TODO [ENG-721]: take out this magic maxresults number
response = self.CLIENT.list_secrets(MaxResults=100)
return [secret["Name"] for secret in response["SecretList"]]
get_secret(self, secret_name)
Gets a secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
the name of the secret to get |
required |
Returns:
Type | Description |
---|---|
BaseSecretSchema |
The secret. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the secret does not exist |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets a secret.
Args:
secret_name: the name of the secret to get
Returns:
The secret.
Raises:
RuntimeError: if the secret does not exist
"""
self._ensure_client_connected(self.region_name)
get_secret_value_response = self.CLIENT.get_secret_value(
SecretId=secret_name
)
if "SecretString" not in get_secret_value_response:
raise RuntimeError(f"No secrets found within the {secret_name}")
secret_contents: Dict[str, str] = json.loads(
get_secret_value_response["SecretString"]
)
zenml_schema_name = secret_contents.pop(ZENML_SCHEMA_NAME)
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**secret_contents)
register_secret(self, secret)
Registers a new secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
the secret to register |
required |
Exceptions:
Type | Description |
---|---|
SecretExistsError |
if the secret already exists |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: the secret to register
Raises:
SecretExistsError: if the secret already exists
"""
self._ensure_client_connected(self.region_name)
secret_value = jsonify_secret_contents(secret)
if secret.name in self.get_all_secret_keys():
raise SecretExistsError(
f"A Secret with the name {secret.name} already exists"
)
kwargs = {"Name": secret.name, "SecretString": secret_value}
self.CLIENT.create_secret(**kwargs)
update_secret(self, secret)
Update an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
the secret to update |
required |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: the secret to update
"""
self._ensure_client_connected(self.region_name)
secret_value = jsonify_secret_contents(secret)
kwargs = {"SecretId": secret.name, "SecretString": secret_value}
self.CLIENT.put_secret_value(**kwargs)
jsonify_secret_contents(secret)
Adds the secret type to the secret contents.
This persists the schema type in the secrets backend, so that the correct SecretSchema can be retrieved when the secret is queried from the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
should be a subclass of the BaseSecretSchema class |
required |
Returns:
Type | Description |
---|---|
str |
jsonified dictionary containing all key-value pairs and the ZenML schema type |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def jsonify_secret_contents(secret: BaseSecretSchema) -> str:
"""Adds the secret type to the secret contents.
This persists the schema type in the secrets backend, so that the correct
SecretSchema can be retrieved when the secret is queried from the backend.
Args:
secret: should be a subclass of the BaseSecretSchema class
Returns:
jsonified dictionary containing all key-value pairs and the ZenML schema
type
"""
secret_contents = secret.content
secret_contents[ZENML_SCHEMA_NAME] = secret.TYPE
return json.dumps(secret_contents)
step_operators
special
Initialization of the Sagemaker Step Operator.
sagemaker_step_operator
Implementation of the Sagemaker Step Operator.
SagemakerStepOperator (BaseStepOperator)
pydantic-model
Step operator to run a step on Sagemaker.
This class defines code that builds an image with the ZenML entrypoint to run using Sagemaker's Estimator.
Attributes:
Name | Type | Description |
---|---|---|
role |
str |
The role that has to be assigned to the jobs which are running in Sagemaker. |
instance_type |
str |
The type of the compute instance where jobs will run. |
base_image |
Optional[str] |
[Optional] The base image to use for building the docker image that will be executed. |
bucket |
Optional[str] |
[Optional] Name of the S3 bucket to use for storing artifacts from the job run. If not provided, a default bucket will be created based on the following format: "sagemaker-{region}-{aws-account-id}". |
experiment_name |
Optional[str] |
[Optional] The name for the experiment to which the job will be associated. If not provided, the job runs would be independent. |
Source code in zenml/integrations/aws/step_operators/sagemaker_step_operator.py
class SagemakerStepOperator(BaseStepOperator):
"""Step operator to run a step on Sagemaker.
This class defines code that builds an image with the ZenML entrypoint
to run using Sagemaker's Estimator.
Attributes:
role: The role that has to be assigned to the jobs which are
running in Sagemaker.
instance_type: The type of the compute instance where jobs will run.
base_image: [Optional] The base image to use for building the docker
image that will be executed.
bucket: [Optional] Name of the S3 bucket to use for storing artifacts
from the job run. If not provided, a default bucket will be created
based on the following format: "sagemaker-{region}-{aws-account-id}".
experiment_name: [Optional] The name for the experiment to which the job
will be associated. If not provided, the job runs would be
independent.
"""
role: str
instance_type: str
base_image: Optional[str] = None
bucket: Optional[str] = None
experiment_name: Optional[str] = None
# Class Configuration
FLAVOR: ClassVar[str] = AWS_SAGEMAKER_STEP_OPERATOR_FLAVOR
@property
def validator(self) -> Optional[StackValidator]:
"""Validates that the stack contains a container registry.
Returns:
A validator that checks that the stack contains a container registry.
"""
def _ensure_local_orchestrator(stack: Stack) -> Tuple[bool, str]:
return (
stack.orchestrator.FLAVOR == "local",
"Local orchestrator is required",
)
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_ensure_local_orchestrator,
)
def _build_docker_image(
self,
pipeline_name: str,
requirements: List[str],
entrypoint_command: List[str],
) -> str:
repo = Repository()
container_registry = repo.active_stack.container_registry
if not container_registry:
raise RuntimeError("Missing container registry")
registry_uri = container_registry.uri.rstrip("/")
image_name = f"{registry_uri}/zenml-sagemaker:{pipeline_name}"
docker_utils.build_docker_image(
build_context_path=get_source_root_path(),
image_name=image_name,
entrypoint=" ".join(entrypoint_command),
requirements=set(requirements),
base_image=self.base_image,
)
container_registry.push_image(image_name)
return docker_utils.get_image_digest(image_name) or image_name
def launch(
self,
pipeline_name: str,
run_name: str,
requirements: List[str],
entrypoint_command: List[str],
) -> None:
"""Launches a step on Sagemaker.
Args:
pipeline_name: Name of the pipeline which the step to be executed
is part of.
run_name: Name of the pipeline run which the step to be executed
is part of.
entrypoint_command: Command that executes the step.
requirements: List of pip requirements that must be installed
inside the step operator environment.
"""
image_name = self._build_docker_image(
pipeline_name=pipeline_name,
requirements=requirements,
entrypoint_command=entrypoint_command,
)
session = sagemaker.Session(default_bucket=self.bucket)
estimator = sagemaker.estimator.Estimator(
image_name,
self.role,
instance_count=1,
instance_type=self.instance_type,
sagemaker_session=session,
)
# Sagemaker doesn't allow any underscores in job/experiment/trial names
sanitized_run_name = run_name.replace("_", "-")
experiment_config = {}
if self.experiment_name:
experiment_config = {
"ExperimentName": self.experiment_name,
"TrialName": sanitized_run_name,
}
estimator.fit(
wait=True,
experiment_config=experiment_config,
job_name=sanitized_run_name,
)
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates that the stack contains a container registry.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A validator that checks that the stack contains a container registry. |
launch(self, pipeline_name, run_name, requirements, entrypoint_command)
Launches a step on Sagemaker.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
Name of the pipeline which the step to be executed is part of. |
required |
run_name |
str |
Name of the pipeline run which the step to be executed is part of. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
requirements |
List[str] |
List of pip requirements that must be installed inside the step operator environment. |
required |
Source code in zenml/integrations/aws/step_operators/sagemaker_step_operator.py
def launch(
self,
pipeline_name: str,
run_name: str,
requirements: List[str],
entrypoint_command: List[str],
) -> None:
"""Launches a step on Sagemaker.
Args:
pipeline_name: Name of the pipeline which the step to be executed
is part of.
run_name: Name of the pipeline run which the step to be executed
is part of.
entrypoint_command: Command that executes the step.
requirements: List of pip requirements that must be installed
inside the step operator environment.
"""
image_name = self._build_docker_image(
pipeline_name=pipeline_name,
requirements=requirements,
entrypoint_command=entrypoint_command,
)
session = sagemaker.Session(default_bucket=self.bucket)
estimator = sagemaker.estimator.Estimator(
image_name,
self.role,
instance_count=1,
instance_type=self.instance_type,
sagemaker_session=session,
)
# Sagemaker doesn't allow any underscores in job/experiment/trial names
sanitized_run_name = run_name.replace("_", "-")
experiment_config = {}
if self.experiment_name:
experiment_config = {
"ExperimentName": self.experiment_name,
"TrialName": sanitized_run_name,
}
estimator.fit(
wait=True,
experiment_config=experiment_config,
job_name=sanitized_run_name,
)
azure
special
Initialization of the ZenML Azure integration.
The Azure integration submodule provides a way to run ZenML pipelines in a cloud
environment. Specifically, it allows the use of cloud artifact stores,
and an io
module to handle file operations on Azure Blob Storage.
The Azure Step Operator integration submodule provides a way to run ZenML steps
in AzureML.
AzureIntegration (Integration)
Definition of Azure integration for ZenML.
Source code in zenml/integrations/azure/__init__.py
class AzureIntegration(Integration):
"""Definition of Azure integration for ZenML."""
NAME = AZURE
REQUIREMENTS = [
"adlfs==2021.10.0",
"azure-keyvault-keys",
"azure-keyvault-secrets",
"azure-identity",
"azureml-core==1.39.0.post1",
]
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declares the flavors for the integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=AZURE_ARTIFACT_STORE_FLAVOR,
source="zenml.integrations.azure.artifact_stores"
".AzureArtifactStore",
type=StackComponentType.ARTIFACT_STORE,
integration=cls.NAME,
),
FlavorWrapper(
name=AZURE_SECRETS_MANAGER_FLAVOR,
source="zenml.integrations.azure.secrets_managers"
".AzureSecretsManager",
type=StackComponentType.SECRETS_MANAGER,
integration=cls.NAME,
),
FlavorWrapper(
name=AZUREML_STEP_OPERATOR_FLAVOR,
source="zenml.integrations.azure.step_operators"
".AzureMLStepOperator",
type=StackComponentType.STEP_OPERATOR,
integration=cls.NAME,
),
]
flavors()
classmethod
Declares the flavors for the integration.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/azure/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declares the flavors for the integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=AZURE_ARTIFACT_STORE_FLAVOR,
source="zenml.integrations.azure.artifact_stores"
".AzureArtifactStore",
type=StackComponentType.ARTIFACT_STORE,
integration=cls.NAME,
),
FlavorWrapper(
name=AZURE_SECRETS_MANAGER_FLAVOR,
source="zenml.integrations.azure.secrets_managers"
".AzureSecretsManager",
type=StackComponentType.SECRETS_MANAGER,
integration=cls.NAME,
),
FlavorWrapper(
name=AZUREML_STEP_OPERATOR_FLAVOR,
source="zenml.integrations.azure.step_operators"
".AzureMLStepOperator",
type=StackComponentType.STEP_OPERATOR,
integration=cls.NAME,
),
]
artifact_stores
special
Initialization of the Azure Artifact Store integration.
azure_artifact_store
Implementation of the Azure Artifact Store integration.
AzureArtifactStore (BaseArtifactStore, AuthenticationMixin)
pydantic-model
Artifact Store for Microsoft Azure based artifacts.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
class AzureArtifactStore(BaseArtifactStore, AuthenticationMixin):
"""Artifact Store for Microsoft Azure based artifacts."""
_filesystem: Optional[adlfs.AzureBlobFileSystem] = None
# Class Configuration
FLAVOR: ClassVar[str] = AZURE_ARTIFACT_STORE_FLAVOR
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"abfs://", "az://"}
@property
def filesystem(self) -> adlfs.AzureBlobFileSystem:
"""The adlfs filesystem to access this artifact store.
Returns:
The adlfs filesystem to access this artifact store.
"""
if not self._filesystem:
secret = self.get_authentication_secret(
expected_schema_type=AzureSecretSchema
)
credentials = secret.content if secret else {}
self._filesystem = adlfs.AzureBlobFileSystem(
**credentials,
anon=False,
use_listings_cache=False,
)
return self._filesystem
@classmethod
def _split_path(cls, path: PathType) -> Tuple[str, str]:
"""Splits a path into the filesystem prefix and remainder.
Example:
```python
prefix, remainder = ZenAzure._split_path("az://my_container/test.txt")
print(prefix, remainder) # "az://" "my_container/test.txt"
```
Args:
path: The path to split.
Returns:
A tuple of the filesystem prefix and the remainder.
"""
path = convert_to_str(path)
prefix = ""
for potential_prefix in cls.SUPPORTED_SCHEMES:
if path.startswith(potential_prefix):
prefix = potential_prefix
path = path[len(potential_prefix) :]
break
return prefix, path
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of files in the given directory.
"""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a dictionary returned by the Azure filesystem.
Args:
file_dict: A dictionary returned by the Azure filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path)
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path to remove.
"""
self.filesystem.rm_file(path=path)
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
Stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
filesystem: AzureBlobFileSystem
property
readonly
The adlfs filesystem to access this artifact store.
Returns:
Type | Description |
---|---|
AzureBlobFileSystem |
The adlfs filesystem to access this artifact store. |
copyfile(self, src, dst, overwrite=False)
Copy a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path to copy from. |
required |
dst |
Union[bytes, str] |
The path to copy to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
exists(self, path)
Check whether a path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path exists, False otherwise. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
glob(self, pattern)
Return all paths that match the given glob pattern.
The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern to match, see details above. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that match the given glob pattern. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
isdir(self, path)
Check whether a path is a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path is a directory, False otherwise. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
listdir(self, path)
Return a list of files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to list. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of files in the given directory. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of files in the given directory.
"""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a dictionary returned by the Azure filesystem.
Args:
file_dict: A dictionary returned by the Azure filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
makedirs(self, path)
Create a directory at the given path.
If needed also create missing parent directories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)
Create a directory at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path)
open(self, path, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path of the file to open. |
required |
mode |
str |
Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported. |
'r' |
Returns:
Type | Description |
---|---|
Any |
A file-like object. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
remove(self, path)
Remove the file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to remove. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path to remove.
"""
self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)
Remove the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to remove. |
required |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
stat(self, path)
Return stat info for the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to get stat info for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Stat info. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
Stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Unused argument to conform to interface. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Unused argument to conform to interface. |
None |
Yields:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
secrets_managers
special
Initialization of the Azure Secrets Manager integration.
azure_secrets_manager
Implementation of the Azure Secrets Manager integration.
AzureSecretsManager (BaseSecretsManager)
pydantic-model
Class to interact with the Azure secrets manager.
Attributes:
Name | Type | Description |
---|---|---|
project_id |
This is necessary to access the correct Azure project. The project_id of your Azure project space that contains the Secret Manager. |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
class AzureSecretsManager(BaseSecretsManager):
"""Class to interact with the Azure secrets manager.
Attributes:
project_id: This is necessary to access the correct Azure project.
The project_id of your Azure project space that contains
the Secret Manager.
"""
key_vault_name: str
# Class configuration
FLAVOR: ClassVar[str] = AZURE_SECRETS_MANAGER_FLAVOR
CLIENT: ClassVar[Any] = None
@classmethod
def _ensure_client_connected(cls, vault_name: str) -> None:
if cls.CLIENT is None:
KVUri = f"https://{vault_name}.vault.azure.net"
credential = DefaultAzureCredential()
cls.CLIENT = SecretClient(vault_url=KVUri, credential=credential)
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: the secret to register
Raises:
SecretExistsError: if the secret already exists
ValueError: if the secret name contains an underscore.
"""
self._ensure_client_connected(self.key_vault_name)
if "_" in secret.name:
raise ValueError(
f"The secret name `{secret.name}` contains an underscore. "
f"This will cause issues with Azure. Please try again."
)
if secret.name in self.get_all_secret_keys():
raise SecretExistsError(
f"A Secret with the name '{secret.name}' already exists."
)
adjusted_content = prepend_group_name_to_keys(secret)
for k, v in adjusted_content.items():
# Create the secret, this only creates an empty secret with the
# supplied name.
azure_secret = self.CLIENT.set_secret(k, v)
self.CLIENT.update_secret_properties(
azure_secret.name,
tags={
ZENML_GROUP_KEY: secret.name,
ZENML_SCHEMA_NAME: secret.TYPE,
},
)
logger.debug("Created created secret: %s", azure_secret.name)
logger.debug("Added value to secret.")
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Get a secret by its name.
Args:
secret_name: the name of the secret to get
Returns:
The secret.
Raises:
RuntimeError: if the secret does not exist
ValueError: if the secret is named 'name'
"""
self._ensure_client_connected(self.key_vault_name)
secret_contents = {}
zenml_schema_name = ""
for secret_property in self.CLIENT.list_properties_of_secrets():
response = self.CLIENT.get_secret(secret_property.name)
tags = response.properties.tags
if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
secret_key = remove_group_name_from_key(
combined_key_name=response.name, group_name=secret_name
)
if secret_key == "name":
raise ValueError("The secret's key cannot be 'name'.")
secret_contents[secret_key] = response.value
zenml_schema_name = tags.get(ZENML_SCHEMA_NAME)
if not secret_contents:
raise RuntimeError(f"No secrets found within the {secret_name}")
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**secret_contents)
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys
"""
self._ensure_client_connected(self.key_vault_name)
set_of_secrets = set()
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if tags:
set_of_secrets.add(tags.get(ZENML_GROUP_KEY))
return list(set_of_secrets)
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret by creating new versions of the existing secrets.
Args:
secret: the secret to update
"""
self._ensure_client_connected(self.key_vault_name)
adjusted_content = prepend_group_name_to_keys(secret)
for k, v in adjusted_content.items():
self.CLIENT.set_secret(k, v)
self.CLIENT.update_secret_properties(
k,
tags={
ZENML_GROUP_KEY: secret.name,
ZENML_SCHEMA_NAME: secret.TYPE,
},
)
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret. by name.
In Azure a secret is a single k-v pair. Within ZenML a secret is a
collection of k-v pairs. As such, deleting a secret will iterate through
all secrets and delete the ones with the secret_name as label.
Args:
secret_name: the name of the secret to delete
"""
self._ensure_client_connected(self.key_vault_name)
# Go through all Azure secrets and delete the ones with the secret_name
# as label.
for secret_property in self.CLIENT.list_properties_of_secrets():
response = self.CLIENT.get_secret(secret_property.name)
tags = response.properties.tags
if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
self.CLIENT.begin_delete_secret(secret_property.name).result()
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
self._ensure_client_connected(self.key_vault_name)
# List all secrets.
for secret_property in self.CLIENT.list_properties_of_secrets():
response = self.CLIENT.get_secret(secret_property.name)
tags = response.properties.tags
if tags and (ZENML_GROUP_KEY in tags or ZENML_SCHEMA_NAME in tags):
logger.info(
"Deleted key-value pair {`%s`, `***`} from secret " "`%s`",
secret_property.name,
tags.get(ZENML_GROUP_KEY),
)
self.CLIENT.begin_delete_secret(secret_property.name).result()
delete_all_secrets(self)
Delete all existing secrets.
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
self._ensure_client_connected(self.key_vault_name)
# List all secrets.
for secret_property in self.CLIENT.list_properties_of_secrets():
response = self.CLIENT.get_secret(secret_property.name)
tags = response.properties.tags
if tags and (ZENML_GROUP_KEY in tags or ZENML_SCHEMA_NAME in tags):
logger.info(
"Deleted key-value pair {`%s`, `***`} from secret " "`%s`",
secret_property.name,
tags.get(ZENML_GROUP_KEY),
)
self.CLIENT.begin_delete_secret(secret_property.name).result()
delete_secret(self, secret_name)
Delete an existing secret. by name.
In Azure a secret is a single k-v pair. Within ZenML a secret is a collection of k-v pairs. As such, deleting a secret will iterate through all secrets and delete the ones with the secret_name as label.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
the name of the secret to delete |
required |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret. by name.
In Azure a secret is a single k-v pair. Within ZenML a secret is a
collection of k-v pairs. As such, deleting a secret will iterate through
all secrets and delete the ones with the secret_name as label.
Args:
secret_name: the name of the secret to delete
"""
self._ensure_client_connected(self.key_vault_name)
# Go through all Azure secrets and delete the ones with the secret_name
# as label.
for secret_property in self.CLIENT.list_properties_of_secrets():
response = self.CLIENT.get_secret(secret_property.name)
tags = response.properties.tags
if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
self.CLIENT.begin_delete_secret(secret_property.name).result()
get_all_secret_keys(self)
Get all secret keys.
Returns:
Type | Description |
---|---|
List[str] |
A list of all secret keys |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys
"""
self._ensure_client_connected(self.key_vault_name)
set_of_secrets = set()
for secret_property in self.CLIENT.list_properties_of_secrets():
tags = secret_property.tags
if tags:
set_of_secrets.add(tags.get(ZENML_GROUP_KEY))
return list(set_of_secrets)
get_secret(self, secret_name)
Get a secret by its name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
the name of the secret to get |
required |
Returns:
Type | Description |
---|---|
BaseSecretSchema |
The secret. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the secret does not exist |
ValueError |
if the secret is named 'name' |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Get a secret by its name.
Args:
secret_name: the name of the secret to get
Returns:
The secret.
Raises:
RuntimeError: if the secret does not exist
ValueError: if the secret is named 'name'
"""
self._ensure_client_connected(self.key_vault_name)
secret_contents = {}
zenml_schema_name = ""
for secret_property in self.CLIENT.list_properties_of_secrets():
response = self.CLIENT.get_secret(secret_property.name)
tags = response.properties.tags
if tags and tags.get(ZENML_GROUP_KEY) == secret_name:
secret_key = remove_group_name_from_key(
combined_key_name=response.name, group_name=secret_name
)
if secret_key == "name":
raise ValueError("The secret's key cannot be 'name'.")
secret_contents[secret_key] = response.value
zenml_schema_name = tags.get(ZENML_SCHEMA_NAME)
if not secret_contents:
raise RuntimeError(f"No secrets found within the {secret_name}")
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**secret_contents)
register_secret(self, secret)
Registers a new secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
the secret to register |
required |
Exceptions:
Type | Description |
---|---|
SecretExistsError |
if the secret already exists |
ValueError |
if the secret name contains an underscore. |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: the secret to register
Raises:
SecretExistsError: if the secret already exists
ValueError: if the secret name contains an underscore.
"""
self._ensure_client_connected(self.key_vault_name)
if "_" in secret.name:
raise ValueError(
f"The secret name `{secret.name}` contains an underscore. "
f"This will cause issues with Azure. Please try again."
)
if secret.name in self.get_all_secret_keys():
raise SecretExistsError(
f"A Secret with the name '{secret.name}' already exists."
)
adjusted_content = prepend_group_name_to_keys(secret)
for k, v in adjusted_content.items():
# Create the secret, this only creates an empty secret with the
# supplied name.
azure_secret = self.CLIENT.set_secret(k, v)
self.CLIENT.update_secret_properties(
azure_secret.name,
tags={
ZENML_GROUP_KEY: secret.name,
ZENML_SCHEMA_NAME: secret.TYPE,
},
)
logger.debug("Created created secret: %s", azure_secret.name)
logger.debug("Added value to secret.")
update_secret(self, secret)
Update an existing secret by creating new versions of the existing secrets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
the secret to update |
required |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret by creating new versions of the existing secrets.
Args:
secret: the secret to update
"""
self._ensure_client_connected(self.key_vault_name)
adjusted_content = prepend_group_name_to_keys(secret)
for k, v in adjusted_content.items():
self.CLIENT.set_secret(k, v)
self.CLIENT.update_secret_properties(
k,
tags={
ZENML_GROUP_KEY: secret.name,
ZENML_SCHEMA_NAME: secret.TYPE,
},
)
prepend_group_name_to_keys(secret)
Adds the secret group name to the keys of each secret key-value pair.
This allows using the same key across multiple secrets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The ZenML Secret schema |
required |
Returns:
Type | Description |
---|---|
Dict[str, str] |
A dictionary with the secret keys prepended with the group name |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def prepend_group_name_to_keys(secret: BaseSecretSchema) -> Dict[str, str]:
"""Adds the secret group name to the keys of each secret key-value pair.
This allows using the same key across multiple
secrets.
Args:
secret: The ZenML Secret schema
Returns:
A dictionary with the secret keys prepended with the group name
"""
return {f"{secret.name}-{k}": v for k, v in secret.content.items()}
remove_group_name_from_key(combined_key_name, group_name)
Removes the secret group name from the secret key.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
combined_key_name |
str |
Full name as it is within the Azure secrets manager |
required |
group_name |
str |
Group name (the ZenML Secret name) |
required |
Returns:
Type | Description |
---|---|
str |
The cleaned key |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the group name is not found in the key name |
Source code in zenml/integrations/azure/secrets_managers/azure_secrets_manager.py
def remove_group_name_from_key(combined_key_name: str, group_name: str) -> str:
"""Removes the secret group name from the secret key.
Args:
combined_key_name: Full name as it is within the Azure secrets manager
group_name: Group name (the ZenML Secret name)
Returns:
The cleaned key
Raises:
RuntimeError: If the group name is not found in the key name
"""
if combined_key_name.startswith(f"{group_name}-"):
return combined_key_name[len(f"{group_name}-") :]
else:
raise RuntimeError(
f"Key-name `{combined_key_name}` does not have the "
f"prefix `{group_name}`. Key could not be "
f"extracted."
)
step_operators
special
Initialization of AzureML Step Operator integration.
azureml_step_operator
Implementation of the ZenML AzureML Step Operator.
AzureMLStepOperator (BaseStepOperator)
pydantic-model
Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.
Attributes:
Name | Type | Description |
---|---|---|
subscription_id |
str |
The Azure account's subscription ID |
resource_group |
str |
The resource group to which the AzureML workspace is deployed. |
workspace_name |
str |
The name of the AzureML Workspace. |
compute_target_name |
str |
The name of the configured ComputeTarget. An instance of it has to be created on the portal if it doesn't exist already. |
environment_name |
Optional[str] |
[Optional] The name of the environment if there already exists one. |
docker_base_image |
Optional[str] |
[Optional] The custom docker base image that the environment should use. |
tenant_id |
Optional[str] |
The Azure Tenant ID. |
service_principal_id |
Optional[str] |
The ID for the service principal that is created to allow apps to access secure resources. |
service_principal_password |
Optional[str] |
Password for the service principal. |
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
class AzureMLStepOperator(BaseStepOperator):
"""Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the
ZenML entrypoint command in it.
Attributes:
subscription_id: The Azure account's subscription ID
resource_group: The resource group to which the AzureML workspace
is deployed.
workspace_name: The name of the AzureML Workspace.
compute_target_name: The name of the configured ComputeTarget.
An instance of it has to be created on the portal if it doesn't
exist already.
environment_name: [Optional] The name of the environment if there
already exists one.
docker_base_image: [Optional] The custom docker base image that the
environment should use.
tenant_id: The Azure Tenant ID.
service_principal_id: The ID for the service principal that is created
to allow apps to access secure resources.
service_principal_password: Password for the service principal.
"""
subscription_id: str
resource_group: str
workspace_name: str
compute_target_name: str
# Environment
environment_name: Optional[str] = None
docker_base_image: Optional[str] = None
# Service principal authentication
# https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication#configure-a-service-principal
tenant_id: Optional[str] = None
service_principal_id: Optional[str] = None
service_principal_password: Optional[str] = None
# Class Configuration
FLAVOR: ClassVar[str] = AZUREML_STEP_OPERATOR_FLAVOR
def _get_authentication(self) -> Optional[AbstractAuthentication]:
"""Returns the authentication object for the AzureML environment.
Returns:
The authentication object for the AzureML environment.
"""
if (
self.tenant_id
and self.service_principal_id
and self.service_principal_password
):
return ServicePrincipalAuthentication(
tenant_id=self.tenant_id,
service_principal_id=self.service_principal_id,
service_principal_password=self.service_principal_password,
)
return None
def _prepare_environment(
self, workspace: Workspace, requirements: List[str], run_name: str
) -> Environment:
"""Prepares the environment in which Azure will run all jobs.
Args:
workspace: The AzureML Workspace that has configuration
for a storage account, container registry among other
things.
requirements: The list of requirements to be installed
in the environment.
run_name: The name of the pipeline run that can be used
for naming environments and runs.
Returns:
The AzureML Environment object.
"""
if self.environment_name:
environment = Environment.get(
workspace=workspace, name=self.environment_name
)
if not environment.python.conda_dependencies:
environment.python.conda_dependencies = (
CondaDependencies.create(
python_version=ZenMLEnvironment.python_version()
)
)
for requirement in requirements:
environment.python.conda_dependencies.add_pip_package(
requirement
)
else:
environment = Environment(name=f"zenml-{run_name}")
environment.python.conda_dependencies = CondaDependencies.create(
pip_packages=requirements,
python_version=ZenMLEnvironment.python_version(),
)
if self.docker_base_image:
# replace the default azure base image
environment.docker.base_image = self.docker_base_image
environment_variables = {
"ENV_ZENML_PREVENT_PIPELINE_EXECUTION": "True",
}
# set credentials to access azure storage
for key in [
"AZURE_STORAGE_ACCOUNT_KEY",
"AZURE_STORAGE_ACCOUNT_NAME",
"AZURE_STORAGE_CONNECTION_STRING",
"AZURE_STORAGE_SAS_TOKEN",
]:
value = os.getenv(key)
if value:
environment_variables[key] = value
environment_variables[
ENV_ZENML_CONFIG_PATH
] = f"./{CONTAINER_ZENML_CONFIG_DIR}"
environment.environment_variables = environment_variables
return environment
def launch(
self,
pipeline_name: str,
run_name: str,
requirements: List[str],
entrypoint_command: List[str],
) -> None:
"""Launches a step on AzureML.
Args:
pipeline_name: Name of the pipeline which the step to be executed
is part of.
run_name: Name of the pipeline run which the step to be executed
is part of.
entrypoint_command: Command that executes the step.
requirements: List of pip requirements that must be installed
inside the step operator environment.
"""
workspace = Workspace.get(
subscription_id=self.subscription_id,
resource_group=self.resource_group,
name=self.workspace_name,
auth=self._get_authentication(),
)
source_directory = get_source_root_path()
config_path = os.path.join(source_directory, CONTAINER_ZENML_CONFIG_DIR)
try:
# Save a copy of the current global configuration with the
# active profile contents into the build context, to have
# the configured stacks accessible from within the Azure ML
# environment.
GlobalConfiguration().copy_active_configuration(
config_path,
load_config_path=f"./{CONTAINER_ZENML_CONFIG_DIR}",
)
environment = self._prepare_environment(
workspace=workspace,
requirements=requirements,
run_name=run_name,
)
compute_target = ComputeTarget(
workspace=workspace, name=self.compute_target_name
)
run_config = ScriptRunConfig(
source_directory=source_directory,
environment=environment,
compute_target=compute_target,
command=entrypoint_command,
)
experiment = Experiment(workspace=workspace, name=pipeline_name)
run = experiment.submit(config=run_config)
finally:
# Clean up the temporary build files
fileio.rmtree(config_path)
run.display_name = run_name
run.wait_for_completion(show_output=True)
launch(self, pipeline_name, run_name, requirements, entrypoint_command)
Launches a step on AzureML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
Name of the pipeline which the step to be executed is part of. |
required |
run_name |
str |
Name of the pipeline run which the step to be executed is part of. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
requirements |
List[str] |
List of pip requirements that must be installed inside the step operator environment. |
required |
Source code in zenml/integrations/azure/step_operators/azureml_step_operator.py
def launch(
self,
pipeline_name: str,
run_name: str,
requirements: List[str],
entrypoint_command: List[str],
) -> None:
"""Launches a step on AzureML.
Args:
pipeline_name: Name of the pipeline which the step to be executed
is part of.
run_name: Name of the pipeline run which the step to be executed
is part of.
entrypoint_command: Command that executes the step.
requirements: List of pip requirements that must be installed
inside the step operator environment.
"""
workspace = Workspace.get(
subscription_id=self.subscription_id,
resource_group=self.resource_group,
name=self.workspace_name,
auth=self._get_authentication(),
)
source_directory = get_source_root_path()
config_path = os.path.join(source_directory, CONTAINER_ZENML_CONFIG_DIR)
try:
# Save a copy of the current global configuration with the
# active profile contents into the build context, to have
# the configured stacks accessible from within the Azure ML
# environment.
GlobalConfiguration().copy_active_configuration(
config_path,
load_config_path=f"./{CONTAINER_ZENML_CONFIG_DIR}",
)
environment = self._prepare_environment(
workspace=workspace,
requirements=requirements,
run_name=run_name,
)
compute_target = ComputeTarget(
workspace=workspace, name=self.compute_target_name
)
run_config = ScriptRunConfig(
source_directory=source_directory,
environment=environment,
compute_target=compute_target,
command=entrypoint_command,
)
experiment = Experiment(workspace=workspace, name=pipeline_name)
run = experiment.submit(config=run_config)
finally:
# Clean up the temporary build files
fileio.rmtree(config_path)
run.display_name = run_name
run.wait_for_completion(show_output=True)
constants
Constants for ZenML integrations.
dash
special
Initialization of the Dash integration.
DashIntegration (Integration)
Definition of Dash integration for ZenML.
Source code in zenml/integrations/dash/__init__.py
class DashIntegration(Integration):
"""Definition of Dash integration for ZenML."""
NAME = DASH
REQUIREMENTS = [
"dash>=2.0.0",
"dash-cytoscape>=0.3.0",
"dash-bootstrap-components>=1.0.1",
"jupyter-dash>=0.4.2",
]
visualizers
special
Initialization of the Pipeline Run Visualizer.
pipeline_run_lineage_visualizer
Implementation of the pipeline run lineage visualizer.
PipelineRunLineageVisualizer (BasePipelineRunVisualizer)
Implementation of a lineage diagram via the dash and dash-cytoscape libraries.
Source code in zenml/integrations/dash/visualizers/pipeline_run_lineage_visualizer.py
class PipelineRunLineageVisualizer(BasePipelineRunVisualizer):
"""Implementation of a lineage diagram via the dash and dash-cytoscape libraries."""
ARTIFACT_PREFIX = "artifact_"
STEP_PREFIX = "step_"
STATUS_CLASS_MAPPING = {
ExecutionStatus.CACHED: "green",
ExecutionStatus.FAILED: "red",
ExecutionStatus.RUNNING: "yellow",
ExecutionStatus.COMPLETED: "blue",
}
def visualize(
self,
object: PipelineRunView,
magic: bool = False,
*args: Any,
**kwargs: Any,
) -> dash.Dash:
"""Method to visualize pipeline runs via the Dash library.
The layout puts every layer of the dag in a column.
Args:
object: The pipeline run to visualize.
magic: If True, the visualization is rendered in a magic mode.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
The Dash application.
"""
external_stylesheets = [
dbc.themes.BOOTSTRAP,
dbc.icons.BOOTSTRAP,
]
if magic:
if Environment.in_notebook:
# Only import jupyter_dash in this case
from jupyter_dash import JupyterDash # noqa
JupyterDash.infer_jupyter_proxy_config()
app = JupyterDash(
__name__,
external_stylesheets=external_stylesheets,
)
mode = "inline"
else:
cli_utils.warning(
"Cannot set magic flag in non-notebook environments."
)
else:
app = dash.Dash(
__name__,
external_stylesheets=external_stylesheets,
)
mode = None
nodes, edges, first_step_id = [], [], None
first_step_id = None
for step in object.steps:
step_output_artifacts = list(step.outputs.values())
execution_id = (
step_output_artifacts[0].producer_step.id
if step_output_artifacts
else step.id
)
step_id = self.STEP_PREFIX + str(step.id)
if first_step_id is None:
first_step_id = step_id
nodes.append(
{
"data": {
"id": step_id,
"execution_id": execution_id,
"label": f"{execution_id} / {step.entrypoint_name}",
"entrypoint_name": step.entrypoint_name, # redundant for consistency
"name": step.name, # redundant for consistency
"type": "step",
"parameters": step.parameters,
"inputs": {k: v.uri for k, v in step.inputs.items()},
"outputs": {k: v.uri for k, v in step.outputs.items()},
},
"classes": self.STATUS_CLASS_MAPPING[step.status],
}
)
for artifact_name, artifact in step.outputs.items():
nodes.append(
{
"data": {
"id": self.ARTIFACT_PREFIX + str(artifact.id),
"execution_id": artifact.id,
"label": f"{artifact.id} / {artifact_name} ("
f"{artifact.data_type})",
"type": "artifact",
"name": artifact_name,
"is_cached": artifact.is_cached,
"artifact_type": artifact.type,
"artifact_data_type": artifact.data_type,
"parent_step_id": artifact.parent_step_id,
"producer_step_id": artifact.producer_step.id,
"uri": artifact.uri,
},
"classes": f"rectangle "
f"{self.STATUS_CLASS_MAPPING[step.status]}",
}
)
edges.append(
{
"data": {
"source": self.STEP_PREFIX + str(step.id),
"target": self.ARTIFACT_PREFIX + str(artifact.id),
},
"classes": f"edge-arrow "
f"{self.STATUS_CLASS_MAPPING[step.status]}"
+ (" dashed" if artifact.is_cached else " solid"),
}
)
for artifact_name, artifact in step.inputs.items():
edges.append(
{
"data": {
"source": self.ARTIFACT_PREFIX + str(artifact.id),
"target": self.STEP_PREFIX + str(step.id),
},
"classes": "edge-arrow "
+ (
f"{self.STATUS_CLASS_MAPPING[ExecutionStatus.CACHED]} dashed"
if artifact.is_cached
else f"{self.STATUS_CLASS_MAPPING[step.status]} solid"
),
}
)
app.layout = dbc.Row(
[
dbc.Container(f"Run: {object.name}", class_name="h1"),
dbc.Row(
[
dbc.Col(
[
dbc.Row(
[
html.Span(
[
html.Span(
[
html.I(
className="bi bi-circle-fill me-1"
),
"Step",
],
className="me-2",
),
html.Span(
[
html.I(
className="bi bi-square-fill me-1"
),
"Artifact",
],
className="me-4",
),
dbc.Badge(
"Completed",
color=COLOR_BLUE,
className="me-1",
),
dbc.Badge(
"Cached",
color=COLOR_GREEN,
className="me-1",
),
dbc.Badge(
"Running",
color=COLOR_YELLOW,
className="me-1",
),
dbc.Badge(
"Failed",
color=COLOR_RED,
className="me-1",
),
]
),
]
),
dbc.Row(
[
cyto.Cytoscape(
id="cytoscape",
layout={
"name": "breadthfirst",
"roots": f'[id = "{first_step_id}"]',
},
elements=edges + nodes,
stylesheet=STYLESHEET,
style={
"width": "100%",
"height": "800px",
},
zoom=1,
)
]
),
dbc.Row(
[
dbc.Button(
"Reset",
id="bt-reset",
color="primary",
className="me-1",
)
]
),
]
),
dbc.Col(
[
dcc.Markdown(id="markdown-selected-node-data"),
]
),
]
),
],
className="p-5",
)
@app.callback( # type: ignore[misc]
Output("markdown-selected-node-data", "children"),
Input("cytoscape", "selectedNodeData"),
)
def display_data(data_list: List[Dict[str, Any]]) -> str:
"""Callback for the text area below the graph.
Args:
data_list: The selected node data.
Returns:
str: The selected node data.
"""
if data_list is None:
return "Click on a node in the diagram."
text = ""
for data in data_list:
text += f'## {data["execution_id"]} / {data["name"]}' + "\n\n"
if data["type"] == "artifact":
for item in [
"artifact_data_type",
"is_cached",
"producer_step_id",
"parent_step_id",
"uri",
]:
text += f"**{item}**: {data[item]}" + "\n\n"
elif data["type"] == "step":
text += "### Inputs:" + "\n\n"
for k, v in data["inputs"].items():
text += f"**{k}**: {v}" + "\n\n"
text += "### Outputs:" + "\n\n"
for k, v in data["outputs"].items():
text += f"**{k}**: {v}" + "\n\n"
text += "### Params:"
for k, v in data["parameters"].items():
text += f"**{k}**: {v}" + "\n\n"
return text
@app.callback( # type: ignore[misc]
[Output("cytoscape", "zoom"), Output("cytoscape", "elements")],
[Input("bt-reset", "n_clicks")],
)
def reset_layout(
n_clicks: int,
) -> List[Union[int, List[Dict[str, Collection[str]]]]]:
"""Resets the layout.
Args:
n_clicks: The number of clicks on the reset button.
Returns:
The zoom and the elements.
"""
logger.debug(n_clicks, "clicked in reset button.")
return [1, edges + nodes]
if mode is not None:
app.run_server(mode=mode)
app.run_server()
return app
visualize(self, object, magic=False, *args, **kwargs)
Method to visualize pipeline runs via the Dash library.
The layout puts every layer of the dag in a column.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
PipelineRunView |
The pipeline run to visualize. |
required |
magic |
bool |
If True, the visualization is rendered in a magic mode. |
False |
*args |
Any |
Additional positional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
Dash |
The Dash application. |
Source code in zenml/integrations/dash/visualizers/pipeline_run_lineage_visualizer.py
def visualize(
self,
object: PipelineRunView,
magic: bool = False,
*args: Any,
**kwargs: Any,
) -> dash.Dash:
"""Method to visualize pipeline runs via the Dash library.
The layout puts every layer of the dag in a column.
Args:
object: The pipeline run to visualize.
magic: If True, the visualization is rendered in a magic mode.
*args: Additional positional arguments.
**kwargs: Additional keyword arguments.
Returns:
The Dash application.
"""
external_stylesheets = [
dbc.themes.BOOTSTRAP,
dbc.icons.BOOTSTRAP,
]
if magic:
if Environment.in_notebook:
# Only import jupyter_dash in this case
from jupyter_dash import JupyterDash # noqa
JupyterDash.infer_jupyter_proxy_config()
app = JupyterDash(
__name__,
external_stylesheets=external_stylesheets,
)
mode = "inline"
else:
cli_utils.warning(
"Cannot set magic flag in non-notebook environments."
)
else:
app = dash.Dash(
__name__,
external_stylesheets=external_stylesheets,
)
mode = None
nodes, edges, first_step_id = [], [], None
first_step_id = None
for step in object.steps:
step_output_artifacts = list(step.outputs.values())
execution_id = (
step_output_artifacts[0].producer_step.id
if step_output_artifacts
else step.id
)
step_id = self.STEP_PREFIX + str(step.id)
if first_step_id is None:
first_step_id = step_id
nodes.append(
{
"data": {
"id": step_id,
"execution_id": execution_id,
"label": f"{execution_id} / {step.entrypoint_name}",
"entrypoint_name": step.entrypoint_name, # redundant for consistency
"name": step.name, # redundant for consistency
"type": "step",
"parameters": step.parameters,
"inputs": {k: v.uri for k, v in step.inputs.items()},
"outputs": {k: v.uri for k, v in step.outputs.items()},
},
"classes": self.STATUS_CLASS_MAPPING[step.status],
}
)
for artifact_name, artifact in step.outputs.items():
nodes.append(
{
"data": {
"id": self.ARTIFACT_PREFIX + str(artifact.id),
"execution_id": artifact.id,
"label": f"{artifact.id} / {artifact_name} ("
f"{artifact.data_type})",
"type": "artifact",
"name": artifact_name,
"is_cached": artifact.is_cached,
"artifact_type": artifact.type,
"artifact_data_type": artifact.data_type,
"parent_step_id": artifact.parent_step_id,
"producer_step_id": artifact.producer_step.id,
"uri": artifact.uri,
},
"classes": f"rectangle "
f"{self.STATUS_CLASS_MAPPING[step.status]}",
}
)
edges.append(
{
"data": {
"source": self.STEP_PREFIX + str(step.id),
"target": self.ARTIFACT_PREFIX + str(artifact.id),
},
"classes": f"edge-arrow "
f"{self.STATUS_CLASS_MAPPING[step.status]}"
+ (" dashed" if artifact.is_cached else " solid"),
}
)
for artifact_name, artifact in step.inputs.items():
edges.append(
{
"data": {
"source": self.ARTIFACT_PREFIX + str(artifact.id),
"target": self.STEP_PREFIX + str(step.id),
},
"classes": "edge-arrow "
+ (
f"{self.STATUS_CLASS_MAPPING[ExecutionStatus.CACHED]} dashed"
if artifact.is_cached
else f"{self.STATUS_CLASS_MAPPING[step.status]} solid"
),
}
)
app.layout = dbc.Row(
[
dbc.Container(f"Run: {object.name}", class_name="h1"),
dbc.Row(
[
dbc.Col(
[
dbc.Row(
[
html.Span(
[
html.Span(
[
html.I(
className="bi bi-circle-fill me-1"
),
"Step",
],
className="me-2",
),
html.Span(
[
html.I(
className="bi bi-square-fill me-1"
),
"Artifact",
],
className="me-4",
),
dbc.Badge(
"Completed",
color=COLOR_BLUE,
className="me-1",
),
dbc.Badge(
"Cached",
color=COLOR_GREEN,
className="me-1",
),
dbc.Badge(
"Running",
color=COLOR_YELLOW,
className="me-1",
),
dbc.Badge(
"Failed",
color=COLOR_RED,
className="me-1",
),
]
),
]
),
dbc.Row(
[
cyto.Cytoscape(
id="cytoscape",
layout={
"name": "breadthfirst",
"roots": f'[id = "{first_step_id}"]',
},
elements=edges + nodes,
stylesheet=STYLESHEET,
style={
"width": "100%",
"height": "800px",
},
zoom=1,
)
]
),
dbc.Row(
[
dbc.Button(
"Reset",
id="bt-reset",
color="primary",
className="me-1",
)
]
),
]
),
dbc.Col(
[
dcc.Markdown(id="markdown-selected-node-data"),
]
),
]
),
],
className="p-5",
)
@app.callback( # type: ignore[misc]
Output("markdown-selected-node-data", "children"),
Input("cytoscape", "selectedNodeData"),
)
def display_data(data_list: List[Dict[str, Any]]) -> str:
"""Callback for the text area below the graph.
Args:
data_list: The selected node data.
Returns:
str: The selected node data.
"""
if data_list is None:
return "Click on a node in the diagram."
text = ""
for data in data_list:
text += f'## {data["execution_id"]} / {data["name"]}' + "\n\n"
if data["type"] == "artifact":
for item in [
"artifact_data_type",
"is_cached",
"producer_step_id",
"parent_step_id",
"uri",
]:
text += f"**{item}**: {data[item]}" + "\n\n"
elif data["type"] == "step":
text += "### Inputs:" + "\n\n"
for k, v in data["inputs"].items():
text += f"**{k}**: {v}" + "\n\n"
text += "### Outputs:" + "\n\n"
for k, v in data["outputs"].items():
text += f"**{k}**: {v}" + "\n\n"
text += "### Params:"
for k, v in data["parameters"].items():
text += f"**{k}**: {v}" + "\n\n"
return text
@app.callback( # type: ignore[misc]
[Output("cytoscape", "zoom"), Output("cytoscape", "elements")],
[Input("bt-reset", "n_clicks")],
)
def reset_layout(
n_clicks: int,
) -> List[Union[int, List[Dict[str, Collection[str]]]]]:
"""Resets the layout.
Args:
n_clicks: The number of clicks on the reset button.
Returns:
The zoom and the elements.
"""
logger.debug(n_clicks, "clicked in reset button.")
return [1, edges + nodes]
if mode is not None:
app.run_server(mode=mode)
app.run_server()
return app
evidently
special
Initialization of the Evidently integration.
The Evidently integration provides a way to monitor your models in production. It includes a way to detect data drift and different kinds of model performance issues.
The results of Evidently calculations can either be exported as an interactive dashboard (visualized as an html file or in your Jupyter notebook), or as a JSON file.
EvidentlyIntegration (Integration)
Evidently integration for ZenML.
Source code in zenml/integrations/evidently/__init__.py
class EvidentlyIntegration(Integration):
"""[Evidently](https://github.com/evidentlyai/evidently) integration for ZenML."""
NAME = EVIDENTLY
REQUIREMENTS = ["evidently==v0.1.41.dev0"]
steps
special
Initialization of the Evidently Standard Steps.
evidently_profile
Implementation of the Evidently Profile Step.
EvidentlyProfileConfig (BaseDriftDetectionConfig)
pydantic-model
Config class for Evidently profile steps.
column_mapping: properties of the DataFrame's columns used !!! profile_section "a string that identifies the profile section to be used." The following are valid options supported by Evidently: - "datadrift" - "categoricaltargetdrift" - "numericaltargetdrift" - "classificationmodelperformance" - "regressionmodelperformance" - "probabilisticmodelperformance"
Source code in zenml/integrations/evidently/steps/evidently_profile.py
class EvidentlyProfileConfig(BaseDriftDetectionConfig):
"""Config class for Evidently profile steps.
column_mapping: properties of the DataFrame's columns used
profile_section: a string that identifies the profile section to be used.
The following are valid options supported by Evidently:
- "datadrift"
- "categoricaltargetdrift"
- "numericaltargetdrift"
- "classificationmodelperformance"
- "regressionmodelperformance"
- "probabilisticmodelperformance"
"""
def get_profile_sections_and_tabs(
self,
) -> Tuple[List[ProfileSection], List[Tab]]:
"""Get the profile sections and tabs to be used in the dashboard.
Returns:
A tuple of two lists of profile sections and tabs.
Raises:
ValueError: if the profile_section is not supported.
"""
try:
return (
[
profile_mapper[profile]()
for profile in self.profile_sections
],
[
dashboard_mapper[profile]()
for profile in self.profile_sections
],
)
except KeyError:
nl = "\n"
raise ValueError(
f"Invalid profile section: {self.profile_sections} \n\n"
f"Valid and supported options are: {nl}- "
f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
)
column_mapping: Optional[ColumnMapping]
profile_sections: Sequence[str]
get_profile_sections_and_tabs(self)
Get the profile sections and tabs to be used in the dashboard.
Returns:
Type | Description |
---|---|
Tuple[List[evidently.model_profile.sections.base_profile_section.ProfileSection], List[evidently.dashboard.tabs.base_tab.Tab]] |
A tuple of two lists of profile sections and tabs. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the profile_section is not supported. |
Source code in zenml/integrations/evidently/steps/evidently_profile.py
def get_profile_sections_and_tabs(
self,
) -> Tuple[List[ProfileSection], List[Tab]]:
"""Get the profile sections and tabs to be used in the dashboard.
Returns:
A tuple of two lists of profile sections and tabs.
Raises:
ValueError: if the profile_section is not supported.
"""
try:
return (
[
profile_mapper[profile]()
for profile in self.profile_sections
],
[
dashboard_mapper[profile]()
for profile in self.profile_sections
],
)
except KeyError:
nl = "\n"
raise ValueError(
f"Invalid profile section: {self.profile_sections} \n\n"
f"Valid and supported options are: {nl}- "
f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
)
EvidentlyProfileStep (BaseDriftDetectionStep)
Step implementation implementing an Evidently Profile Step.
Source code in zenml/integrations/evidently/steps/evidently_profile.py
class EvidentlyProfileStep(BaseDriftDetectionStep):
"""Step implementation implementing an Evidently Profile Step."""
OUTPUT_SPEC = {
"profile": DataAnalysisArtifact,
"dashboard": DataAnalysisArtifact,
}
def entrypoint( # type: ignore[override]
self,
reference_dataset: pd.DataFrame,
comparison_dataset: pd.DataFrame,
config: EvidentlyProfileConfig,
) -> Output( # type:ignore[valid-type]
profile=dict, dashboard=str
):
"""Main entrypoint for the Evidently categorical target drift detection step.
Args:
reference_dataset: a Pandas DataFrame
comparison_dataset: a Pandas DataFrame of new data you wish to
compare against the reference data
config: the configuration for the step
Returns:
profile: dictionary report extracted from an Evidently Profile
generated for the data drift
dashboard: HTML report extracted from an Evidently Dashboard
generated for the data drift
"""
sections, tabs = config.get_profile_sections_and_tabs()
data_drift_dashboard = Dashboard(tabs=tabs)
data_drift_dashboard.calculate(
reference_dataset,
comparison_dataset,
column_mapping=config.column_mapping or None,
)
data_drift_profile = Profile(sections=sections)
data_drift_profile.calculate(
reference_dataset,
comparison_dataset,
column_mapping=config.column_mapping or None,
)
return [data_drift_profile.object(), data_drift_dashboard.html()]
CONFIG_CLASS (BaseDriftDetectionConfig)
pydantic-model
Config class for Evidently profile steps.
column_mapping: properties of the DataFrame's columns used !!! profile_section "a string that identifies the profile section to be used." The following are valid options supported by Evidently: - "datadrift" - "categoricaltargetdrift" - "numericaltargetdrift" - "classificationmodelperformance" - "regressionmodelperformance" - "probabilisticmodelperformance"
Source code in zenml/integrations/evidently/steps/evidently_profile.py
class EvidentlyProfileConfig(BaseDriftDetectionConfig):
"""Config class for Evidently profile steps.
column_mapping: properties of the DataFrame's columns used
profile_section: a string that identifies the profile section to be used.
The following are valid options supported by Evidently:
- "datadrift"
- "categoricaltargetdrift"
- "numericaltargetdrift"
- "classificationmodelperformance"
- "regressionmodelperformance"
- "probabilisticmodelperformance"
"""
def get_profile_sections_and_tabs(
self,
) -> Tuple[List[ProfileSection], List[Tab]]:
"""Get the profile sections and tabs to be used in the dashboard.
Returns:
A tuple of two lists of profile sections and tabs.
Raises:
ValueError: if the profile_section is not supported.
"""
try:
return (
[
profile_mapper[profile]()
for profile in self.profile_sections
],
[
dashboard_mapper[profile]()
for profile in self.profile_sections
],
)
except KeyError:
nl = "\n"
raise ValueError(
f"Invalid profile section: {self.profile_sections} \n\n"
f"Valid and supported options are: {nl}- "
f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
)
column_mapping: Optional[ColumnMapping]
profile_sections: Sequence[str]
get_profile_sections_and_tabs(self)
Get the profile sections and tabs to be used in the dashboard.
Returns:
Type | Description |
---|---|
Tuple[List[evidently.model_profile.sections.base_profile_section.ProfileSection], List[evidently.dashboard.tabs.base_tab.Tab]] |
A tuple of two lists of profile sections and tabs. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the profile_section is not supported. |
Source code in zenml/integrations/evidently/steps/evidently_profile.py
def get_profile_sections_and_tabs(
self,
) -> Tuple[List[ProfileSection], List[Tab]]:
"""Get the profile sections and tabs to be used in the dashboard.
Returns:
A tuple of two lists of profile sections and tabs.
Raises:
ValueError: if the profile_section is not supported.
"""
try:
return (
[
profile_mapper[profile]()
for profile in self.profile_sections
],
[
dashboard_mapper[profile]()
for profile in self.profile_sections
],
)
except KeyError:
nl = "\n"
raise ValueError(
f"Invalid profile section: {self.profile_sections} \n\n"
f"Valid and supported options are: {nl}- "
f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
)
entrypoint(self, reference_dataset, comparison_dataset, config)
Main entrypoint for the Evidently categorical target drift detection step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reference_dataset |
DataFrame |
a Pandas DataFrame |
required |
comparison_dataset |
DataFrame |
a Pandas DataFrame of new data you wish to compare against the reference data |
required |
config |
EvidentlyProfileConfig |
the configuration for the step |
required |
Returns:
Type | Description |
---|---|
profile |
dictionary report extracted from an Evidently Profile generated for the data drift dashboard: HTML report extracted from an Evidently Dashboard generated for the data drift |
Source code in zenml/integrations/evidently/steps/evidently_profile.py
def entrypoint( # type: ignore[override]
self,
reference_dataset: pd.DataFrame,
comparison_dataset: pd.DataFrame,
config: EvidentlyProfileConfig,
) -> Output( # type:ignore[valid-type]
profile=dict, dashboard=str
):
"""Main entrypoint for the Evidently categorical target drift detection step.
Args:
reference_dataset: a Pandas DataFrame
comparison_dataset: a Pandas DataFrame of new data you wish to
compare against the reference data
config: the configuration for the step
Returns:
profile: dictionary report extracted from an Evidently Profile
generated for the data drift
dashboard: HTML report extracted from an Evidently Dashboard
generated for the data drift
"""
sections, tabs = config.get_profile_sections_and_tabs()
data_drift_dashboard = Dashboard(tabs=tabs)
data_drift_dashboard.calculate(
reference_dataset,
comparison_dataset,
column_mapping=config.column_mapping or None,
)
data_drift_profile = Profile(sections=sections)
data_drift_profile.calculate(
reference_dataset,
comparison_dataset,
column_mapping=config.column_mapping or None,
)
return [data_drift_profile.object(), data_drift_dashboard.html()]
visualizers
special
Initialization for Evidently visualizer.
evidently_visualizer
Implementation of the Evidently visualizer.
EvidentlyVisualizer (BaseStepVisualizer)
The implementation of an Evidently Visualizer.
Source code in zenml/integrations/evidently/visualizers/evidently_visualizer.py
class EvidentlyVisualizer(BaseStepVisualizer):
"""The implementation of an Evidently Visualizer."""
@abstractmethod
def visualize(self, object: StepView, *args: Any, **kwargs: Any) -> None:
"""Method to visualize components.
Args:
object: StepView fetched from run.get_step().
*args: Additional arguments.
**kwargs: Additional keyword arguments.
"""
for artifact_view in object.outputs.values():
# filter out anything but data analysis artifacts
if (
artifact_view.type == DataAnalysisArtifact.__name__
and artifact_view.data_type == "builtins.str"
):
artifact = artifact_view.read()
self.generate_facet(artifact)
def generate_facet(self, html_: str) -> None:
"""Generate a Facet Overview.
Args:
html_: HTML represented as a string.
"""
if Environment.in_notebook() or Environment.in_google_colab():
from IPython.core.display import HTML, display
display(HTML(html_))
else:
logger.warning(
"The magic functions are only usable in a Jupyter notebook."
)
with tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".html", encoding="utf-8"
) as f:
f.write(html_)
url = f"file:///{f.name}"
logger.info("Opening %s in a new browser.." % f.name)
webbrowser.open(url, new=2)
generate_facet(self, html_)
Generate a Facet Overview.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
html_ |
str |
HTML represented as a string. |
required |
Source code in zenml/integrations/evidently/visualizers/evidently_visualizer.py
def generate_facet(self, html_: str) -> None:
"""Generate a Facet Overview.
Args:
html_: HTML represented as a string.
"""
if Environment.in_notebook() or Environment.in_google_colab():
from IPython.core.display import HTML, display
display(HTML(html_))
else:
logger.warning(
"The magic functions are only usable in a Jupyter notebook."
)
with tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".html", encoding="utf-8"
) as f:
f.write(html_)
url = f"file:///{f.name}"
logger.info("Opening %s in a new browser.." % f.name)
webbrowser.open(url, new=2)
visualize(self, object, *args, **kwargs)
Method to visualize components.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
StepView |
StepView fetched from run.get_step(). |
required |
*args |
Any |
Additional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Source code in zenml/integrations/evidently/visualizers/evidently_visualizer.py
@abstractmethod
def visualize(self, object: StepView, *args: Any, **kwargs: Any) -> None:
"""Method to visualize components.
Args:
object: StepView fetched from run.get_step().
*args: Additional arguments.
**kwargs: Additional keyword arguments.
"""
for artifact_view in object.outputs.values():
# filter out anything but data analysis artifacts
if (
artifact_view.type == DataAnalysisArtifact.__name__
and artifact_view.data_type == "builtins.str"
):
artifact = artifact_view.read()
self.generate_facet(artifact)
facets
special
Facets integration for ZenML.
The Facets integration provides a simple way to visualize post-execution objects
like PipelineView
, PipelineRunView
and StepView
. These objects can be
extended using the BaseVisualization
class. This integration requires
facets-overview
be installed in your Python environment.
FacetsIntegration (Integration)
Definition of Facet integration for ZenML.
Source code in zenml/integrations/facets/__init__.py
class FacetsIntegration(Integration):
"""Definition of [Facet](https://pair-code.github.io/facets/) integration for ZenML."""
NAME = FACETS
REQUIREMENTS = ["facets-overview>=1.0.0", "IPython"]
visualizers
special
Intitialization of the Facet Visualizer.
facet_statistics_visualizer
Implementation of the Facet Statistics Visualizer.
FacetStatisticsVisualizer (BaseStepVisualizer)
The base implementation of a ZenML Visualizer.
Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
class FacetStatisticsVisualizer(BaseStepVisualizer):
"""The base implementation of a ZenML Visualizer."""
@abstractmethod
def visualize(
self, object: StepView, magic: bool = False, *args: Any, **kwargs: Any
) -> None:
"""Method to visualize components.
Args:
object: StepView fetched from run.get_step().
magic: Whether to render in a Jupyter notebook or not.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
"""
datasets = []
for output_name, artifact_view in object.outputs.items():
df = artifact_view.read()
if type(df) is not pd.DataFrame:
logger.warning(
"`%s` is not a pd.DataFrame. You can only visualize "
"statistics of steps that output pandas DataFrames. "
"Skipping this output.." % output_name
)
else:
datasets.append({"name": output_name, "table": df})
h = self.generate_html(datasets)
self.generate_facet(h, magic)
def generate_html(self, datasets: List[Dict[Text, pd.DataFrame]]) -> str:
"""Generates html for facet.
Args:
datasets: List of dicts of DataFrames to be visualized as stats.
Returns:
HTML template with proto string embedded.
"""
proto = GenericFeatureStatisticsGenerator().ProtoFromDataFrames(
datasets
)
protostr = base64.b64encode(proto.SerializeToString()).decode("utf-8")
template = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"stats.html",
)
html_template = io_utils.read_file_contents_as_string(template)
html_ = html_template.replace("protostr", protostr)
return html_
def generate_facet(self, html_: str, magic: bool = False) -> None:
"""Generate a Facet Overview.
Args:
html_: HTML represented as a string.
magic: Whether to magically materialize facet in a notebook.
Raises:
EnvironmentError: If magic is True and not in a notebook.
"""
if magic:
if not Environment.in_notebook() or Environment.in_google_colab():
raise EnvironmentError(
"The magic functions are only usable in a Jupyter notebook."
)
display(HTML(html_))
else:
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
io_utils.write_file_contents_as_string(f.name, html_)
url = f"file:///{f.name}"
logger.info("Opening %s in a new browser.." % f.name)
webbrowser.open(url, new=2)
generate_facet(self, html_, magic=False)
Generate a Facet Overview.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
html_ |
str |
HTML represented as a string. |
required |
magic |
bool |
Whether to magically materialize facet in a notebook. |
False |
Exceptions:
Type | Description |
---|---|
EnvironmentError |
If magic is True and not in a notebook. |
Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
def generate_facet(self, html_: str, magic: bool = False) -> None:
"""Generate a Facet Overview.
Args:
html_: HTML represented as a string.
magic: Whether to magically materialize facet in a notebook.
Raises:
EnvironmentError: If magic is True and not in a notebook.
"""
if magic:
if not Environment.in_notebook() or Environment.in_google_colab():
raise EnvironmentError(
"The magic functions are only usable in a Jupyter notebook."
)
display(HTML(html_))
else:
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
io_utils.write_file_contents_as_string(f.name, html_)
url = f"file:///{f.name}"
logger.info("Opening %s in a new browser.." % f.name)
webbrowser.open(url, new=2)
generate_html(self, datasets)
Generates html for facet.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
datasets |
List[Dict[str, pandas.core.frame.DataFrame]] |
List of dicts of DataFrames to be visualized as stats. |
required |
Returns:
Type | Description |
---|---|
str |
HTML template with proto string embedded. |
Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
def generate_html(self, datasets: List[Dict[Text, pd.DataFrame]]) -> str:
"""Generates html for facet.
Args:
datasets: List of dicts of DataFrames to be visualized as stats.
Returns:
HTML template with proto string embedded.
"""
proto = GenericFeatureStatisticsGenerator().ProtoFromDataFrames(
datasets
)
protostr = base64.b64encode(proto.SerializeToString()).decode("utf-8")
template = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"stats.html",
)
html_template = io_utils.read_file_contents_as_string(template)
html_ = html_template.replace("protostr", protostr)
return html_
visualize(self, object, magic=False, *args, **kwargs)
Method to visualize components.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
StepView |
StepView fetched from run.get_step(). |
required |
magic |
bool |
Whether to render in a Jupyter notebook or not. |
False |
*args |
Any |
Additional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
@abstractmethod
def visualize(
self, object: StepView, magic: bool = False, *args: Any, **kwargs: Any
) -> None:
"""Method to visualize components.
Args:
object: StepView fetched from run.get_step().
magic: Whether to render in a Jupyter notebook or not.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
"""
datasets = []
for output_name, artifact_view in object.outputs.items():
df = artifact_view.read()
if type(df) is not pd.DataFrame:
logger.warning(
"`%s` is not a pd.DataFrame. You can only visualize "
"statistics of steps that output pandas DataFrames. "
"Skipping this output.." % output_name
)
else:
datasets.append({"name": output_name, "table": df})
h = self.generate_html(datasets)
self.generate_facet(h, magic)
feast
special
Initialization for Feast integration.
The Feast integration offers a way to connect to a Feast Feature Store. ZenML implements a dedicated stack component that you can access as part of your ZenML steps in the usual ways.
FeastIntegration (Integration)
Definition of Feast integration for ZenML.
Source code in zenml/integrations/feast/__init__.py
class FeastIntegration(Integration):
"""Definition of Feast integration for ZenML."""
NAME = FEAST
REQUIREMENTS = ["feast[redis]>=0.19.4", "redis-server"]
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declare the stack component flavors for the Feast integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=FEAST_FEATURE_STORE_FLAVOR,
source="zenml.integrations.feast.feature_store.FeastFeatureStore",
type=StackComponentType.FEATURE_STORE,
integration=cls.NAME,
)
]
flavors()
classmethod
Declare the stack component flavors for the Feast integration.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/feast/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declare the stack component flavors for the Feast integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=FEAST_FEATURE_STORE_FLAVOR,
source="zenml.integrations.feast.feature_store.FeastFeatureStore",
type=StackComponentType.FEATURE_STORE,
integration=cls.NAME,
)
]
feature_stores
special
Feast Feature Store integration for ZenML.
Feature stores allow data teams to serve data via an offline store and an online low-latency store where data is kept in sync between the two. It also offers a centralized registry where features (and feature schemas) are stored for use within a team or wider organization. Feature stores are a relatively recent addition to commonly-used machine learning stacks. Feast is a leading open-source feature store, first developed by Gojek in collaboration with Google.
feast_feature_store
Implementation of the Feast Feature Store for ZenML.
FeastFeatureStore (BaseFeatureStore)
pydantic-model
Class to interact with the Feast feature store.
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
class FeastFeatureStore(BaseFeatureStore):
"""Class to interact with the Feast feature store."""
FLAVOR: ClassVar[str] = FEAST_FEATURE_STORE_FLAVOR
online_host: str = "localhost"
online_port: int = 6379
feast_repo: str
def _validate_connection(self) -> None:
"""Validates the connection to the feature store.
Raises:
ConnectionError: If the online component (Redis) is not available.
"""
client = redis.Redis(host=self.online_host, port=self.online_port)
try:
client.ping()
except redis.exceptions.ConnectionError as e:
raise redis.exceptions.ConnectionError(
"Could not connect to feature store's online component. "
"Please make sure that Redis is running."
) from e
def get_historical_features(
self,
entity_df: Union[pd.DataFrame, str],
features: List[str],
full_feature_names: bool = False,
) -> pd.DataFrame:
"""Returns the historical features for training or batch scoring.
Args:
entity_df: The entity DataFrame or entity name.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The historical features as a Pandas DataFrame.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return fs.get_historical_features(
entity_df=entity_df,
features=features,
full_feature_names=full_feature_names,
).to_df()
def get_online_features(
self,
entity_rows: List[Dict[str, Any]],
features: List[str],
full_feature_names: bool = False,
) -> Dict[str, Any]:
"""Returns the latest online feature data.
Args:
entity_rows: The entity rows to retrieve.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The latest online feature data as a dictionary.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return fs.get_online_features( # type: ignore[no-any-return]
entity_rows=entity_rows,
features=features,
full_feature_names=full_feature_names,
).to_dict()
def get_data_sources(self) -> List[str]:
"""Returns the data sources' names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The data sources' names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_data_sources()]
def get_entities(self) -> List[str]:
"""Returns the entity names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The entity names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_entities()]
def get_feature_services(self) -> List[str]:
"""Returns the feature service names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature service names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_feature_services()]
def get_feature_views(self) -> List[str]:
"""Returns the feature view names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature view names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_feature_views()]
def get_project(self) -> str:
"""Returns the project name.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The project name.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return str(fs.project)
def get_registry(self) -> Registry:
"""Returns the feature store registry.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The registry.
"""
fs: FeatureStore = FeatureStore(repo_path=self.feast_repo)
return fs.registry
def get_feast_version(self) -> str:
"""Returns the version of Feast used.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The version of Feast currently being used.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return str(fs.version())
get_data_sources(self)
Returns the data sources' names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The data sources' names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_data_sources(self) -> List[str]:
"""Returns the data sources' names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The data sources' names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_data_sources()]
get_entities(self)
Returns the entity names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The entity names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_entities(self) -> List[str]:
"""Returns the entity names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The entity names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_entities()]
get_feast_version(self)
Returns the version of Feast used.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
str |
The version of Feast currently being used. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feast_version(self) -> str:
"""Returns the version of Feast used.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The version of Feast currently being used.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return str(fs.version())
get_feature_services(self)
Returns the feature service names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The feature service names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_services(self) -> List[str]:
"""Returns the feature service names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature service names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_feature_services()]
get_feature_views(self)
Returns the feature view names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The feature view names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_views(self) -> List[str]:
"""Returns the feature view names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature view names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_feature_views()]
get_historical_features(self, entity_df, features, full_feature_names=False)
Returns the historical features for training or batch scoring.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_df |
Union[pandas.core.frame.DataFrame, str] |
The entity DataFrame or entity name. |
required |
features |
List[str] |
The features to retrieve. |
required |
full_feature_names |
bool |
Whether to return the full feature names. |
False |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
DataFrame |
The historical features as a Pandas DataFrame. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_historical_features(
self,
entity_df: Union[pd.DataFrame, str],
features: List[str],
full_feature_names: bool = False,
) -> pd.DataFrame:
"""Returns the historical features for training or batch scoring.
Args:
entity_df: The entity DataFrame or entity name.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The historical features as a Pandas DataFrame.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return fs.get_historical_features(
entity_df=entity_df,
features=features,
full_feature_names=full_feature_names,
).to_df()
get_online_features(self, entity_rows, features, full_feature_names=False)
Returns the latest online feature data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_rows |
List[Dict[str, Any]] |
The entity rows to retrieve. |
required |
features |
List[str] |
The features to retrieve. |
required |
full_feature_names |
bool |
Whether to return the full feature names. |
False |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The latest online feature data as a dictionary. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_online_features(
self,
entity_rows: List[Dict[str, Any]],
features: List[str],
full_feature_names: bool = False,
) -> Dict[str, Any]:
"""Returns the latest online feature data.
Args:
entity_rows: The entity rows to retrieve.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The latest online feature data as a dictionary.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return fs.get_online_features( # type: ignore[no-any-return]
entity_rows=entity_rows,
features=features,
full_feature_names=full_feature_names,
).to_dict()
get_project(self)
Returns the project name.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
str |
The project name. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_project(self) -> str:
"""Returns the project name.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The project name.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return str(fs.project)
get_registry(self)
Returns the feature store registry.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
Registry |
The registry. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_registry(self) -> Registry:
"""Returns the feature store registry.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The registry.
"""
fs: FeatureStore = FeatureStore(repo_path=self.feast_repo)
return fs.registry
gcp
special
Initialization of the GCP ZenML integration.
The GCP integration submodule provides a way to run ZenML pipelines in a cloud
environment. Specifically, it allows the use of cloud artifact stores, metadata
stores, and an io
module to handle file operations on Google Cloud Storage
(GCS).
Additionally, the GCP secrets manager integration submodule provides a way to access the GCP secrets manager from within your ZenML Pipeline runs.
The Vertex AI integration submodule provides a way to run ZenML pipelines in a Vertex AI environment.
GcpIntegration (Integration)
Definition of Google Cloud Platform integration for ZenML.
Source code in zenml/integrations/gcp/__init__.py
class GcpIntegration(Integration):
"""Definition of Google Cloud Platform integration for ZenML."""
NAME = GCP
REQUIREMENTS = [
"kfp",
"gcsfs",
"google-cloud-secret-manager",
"google-cloud-aiplatform>=1.11.0",
]
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declare the stack component flavors for the GCP integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=GCP_ARTIFACT_STORE_FLAVOR,
source="zenml.integrations.gcp.artifact_stores"
".GCPArtifactStore",
type=StackComponentType.ARTIFACT_STORE,
integration=cls.NAME,
),
FlavorWrapper(
name=GCP_SECRETS_MANAGER_FLAVOR,
source="zenml.integrations.gcp.secrets_manager."
"GCPSecretsManager",
type=StackComponentType.SECRETS_MANAGER,
integration=cls.NAME,
),
FlavorWrapper(
name=GCP_VERTEX_ORCHESTRATOR_FLAVOR,
source="zenml.integrations.gcp.orchestrators"
".VertexOrchestrator",
type=StackComponentType.ORCHESTRATOR,
integration=cls.NAME,
),
FlavorWrapper(
name=GCP_VERTEX_STEP_OPERATOR_FLAVOR,
source="zenml.integrations.gcp.step_operators"
".VertexStepOperator",
type=StackComponentType.STEP_OPERATOR,
integration=cls.NAME,
),
]
flavors()
classmethod
Declare the stack component flavors for the GCP integration.
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.models.flavor_wrapper.FlavorWrapper] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/gcp/__init__.py
@classmethod
def flavors(cls) -> List[FlavorWrapper]:
"""Declare the stack component flavors for the GCP integration.
Returns:
List of stack component flavors for this integration.
"""
return [
FlavorWrapper(
name=GCP_ARTIFACT_STORE_FLAVOR,
source="zenml.integrations.gcp.artifact_stores"
".GCPArtifactStore",
type=StackComponentType.ARTIFACT_STORE,
integration=cls.NAME,
),
FlavorWrapper(
name=GCP_SECRETS_MANAGER_FLAVOR,
source="zenml.integrations.gcp.secrets_manager."
"GCPSecretsManager",
type=StackComponentType.SECRETS_MANAGER,
integration=cls.NAME,
),
FlavorWrapper(
name=GCP_VERTEX_ORCHESTRATOR_FLAVOR,
source="zenml.integrations.gcp.orchestrators"
".VertexOrchestrator",
type=StackComponentType.ORCHESTRATOR,
integration=cls.NAME,
),
FlavorWrapper(
name=GCP_VERTEX_STEP_OPERATOR_FLAVOR,
source="zenml.integrations.gcp.step_operators"
".VertexStepOperator",
type=StackComponentType.STEP_OPERATOR,
integration=cls.NAME,
),
]
artifact_stores
special
Initialization of the GCP Artifact Store.
gcp_artifact_store
Implementation of the GCP Artifact Store.
GCPArtifactStore (BaseArtifactStore, AuthenticationMixin)
pydantic-model
Artifact Store for Google Cloud Storage based artifacts.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
class GCPArtifactStore(BaseArtifactStore, AuthenticationMixin):
"""Artifact Store for Google Cloud Storage based artifacts."""
_filesystem: Optional[gcsfs.GCSFileSystem] = None
# Class Configuration
FLAVOR: ClassVar[str] = GCP_ARTIFACT_STORE_FLAVOR
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {GCP_PATH_PREFIX}
@property
def filesystem(self) -> gcsfs.GCSFileSystem:
"""The gcsfs filesystem to access this artifact store.
Returns:
The gcsfs filesystem to access this artifact store.
"""
if not self._filesystem:
secret = self.get_authentication_secret(
expected_schema_type=GCPSecretSchema
)
token = secret.get_credential_dict() if secret else None
self._filesystem = gcsfs.GCSFileSystem(token=token)
return self._filesystem
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object that can be used to read or write to the file.
"""
return self.filesystem.open(path=path, mode=mode)
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
return [
f"{GCP_PATH_PREFIX}{path}"
for path in self.filesystem.glob(path=pattern)
]
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path of the directory to list.
Returns:
A list of paths of files in the directory.
"""
path_without_prefix = convert_to_str(path)
if path_without_prefix.startswith(GCP_PATH_PREFIX):
path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :]
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a file info dict returned by GCP.
Args:
file_dict: A file info dict returned by the GCP filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path_without_prefix) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path of the directory to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path of the directory to create.
"""
self.filesystem.makedir(path=path)
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path of the file to remove.
"""
self.filesystem.rm_file(path=path)
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: the path to get stat info for.
Returns:
A dictionary with the stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files
filesystem: GCSFileSystem
property
readonly
The gcsfs filesystem to access this artifact store.
Returns:
Type | Description |
---|---|
GCSFileSystem |
The gcsfs filesystem to access this artifact store. |
copyfile(self, src, dst, overwrite=False)
Copy a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path to copy from. |
required |
dst |
Union[bytes, str] |
The path to copy to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
exists(self, path)
Check whether a path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path exists, False otherwise. |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
glob(self, pattern)
Return all paths that match the given glob pattern.
The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern to match, see details above. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that match the given glob pattern. |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
return [
f"{GCP_PATH_PREFIX}{path}"
for path in self.filesystem.glob(path=pattern)
]
isdir(self, path)
Check whether a path is a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path is a directory, False otherwise. |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
listdir(self, path)
Return a list of files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to list. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths of files in the directory. |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path of the directory to list.
Returns:
A list of paths of files in the directory.
"""
path_without_prefix = convert_to_str(path)
if path_without_prefix.startswith(GCP_PATH_PREFIX):
path_without_prefix = path_without_prefix[len(GCP_PATH_PREFIX) :]
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a file info dict returned by GCP.
Args:
file_dict: A file info dict returned by the GCP filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path_without_prefix) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
makedirs(self, path)
Create a directory at the given path.
If needed also create missing parent directories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to create. |
required |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path of the directory to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)
Create a directory at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to create. |
required |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path of the directory to create.
"""
self.filesystem.makedir(path=path)
open(self, path, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path of the file to open. |
required |
mode |
str |
Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported. |
'r' |
Returns:
Type | Description |
---|---|
Any |
A file-like object that can be used to read or write to the file. |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object that can be used to read or write to the file.
"""
return self.filesystem.open(path=path, mode=mode)
remove(self, path)
Remove the file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the file to remove. |
required |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path of the file to remove.
"""
self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)
Remove the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to remove. |
required |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
stat(self, path)
Return stat info for the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
the path to get stat info for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary with the stat info. |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: the path to get stat info for.
Returns:
A dictionary with the stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Unused argument to conform to interface. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Unused argument to conform to interface. |
None |
Yields:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{GCP_PATH_PREFIX}{directory}", subdirectories, files
constants
Constants for the VertexAI integration.
google_credentials_mixin
Implementation of the Google credentials mixin.
GoogleCredentialsMixin (BaseModel)
pydantic-model
Mixin for Google Cloud Platform credentials.
Attributes:
Name | Type | Description |
---|---|---|
service_account_path |
Optional[str] |
path to the service account credentials file to be used for authentication. If not provided, the default credentials will be used. |
Source code in zenml/integrations/gcp/google_credentials_mixin.py
class GoogleCredentialsMixin(BaseModel):
"""Mixin for Google Cloud Platform credentials.
Attributes:
service_account_path: path to the service account credentials file to be
used for authentication. If not provided, the default credentials
will be used.
"""
service_account_path: Optional[str] = None
def _get_authentication(self) -> Tuple["Credentials", str]:
"""Get GCP credentials and the project ID associated with the credentials.
If `service_account_path` is provided, then the credentials will be
loaded from the file at that path. Otherwise, the default credentials
will be used.
Returns:
A tuple containing the credentials and the project ID associated to
the credentials.
"""
if self.service_account_path:
credentials, project_id = load_credentials_from_file(
self.service_account_path
)
else:
credentials, project_id = default()
return credentials, project_id
orchestrators
special
Initialization for the VertexAI orchestrator.
vertex_entrypoint_configuration
Implementation of the VertexAI entrypoint configuration.
VertexEntrypointConfiguration (StepEntrypointConfiguration)
Entrypoint configuration for running steps on Vertex AI Pipelines.
Source code in zenml/integrations/gcp/orchestrators/vertex_entrypoint_configuration.py
class VertexEntrypointConfiguration(StepEntrypointConfiguration):
"""Entrypoint configuration for running steps on Vertex AI Pipelines."""
@classmethod
def get_custom_entrypoint_options(cls) -> Set[str]:
"""Vertex AI Pipelines specific entrypoint options.
The argument `VERTEX_JOB_ID_OPTION` allows to specify the job id of the
Vertex AI Pipeline and get it in the execution of the step, via the `get_run_name`
method.
Returns:
The set of custom entrypoint options.
"""
return {VERTEX_JOB_ID_OPTION}
@classmethod
def get_custom_entrypoint_arguments(
cls, step: "BaseStep", *args: Any, **kwargs: Any
) -> List[str]:
"""Sets the value for the `VERTEX_JOB_ID_OPTION` argument.
Args:
step: The step to be executed.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
Returns:
A list of arguments for the entrypoint.
"""
return [f"--{VERTEX_JOB_ID_OPTION}", kwargs[VERTEX_JOB_ID_OPTION]]
def get_run_name(self, pipeline_name: str) -> str:
"""Returns the Vertex AI Pipeline job id.
Args:
pipeline_name: The name of the pipeline.
Returns:
The Vertex AI Pipeline job id.
"""
job_id: str = self.entrypoint_args[VERTEX_JOB_ID_OPTION]
return job_id
get_custom_entrypoint_arguments(step, *args, **kwargs)
classmethod
Sets the value for the VERTEX_JOB_ID_OPTION
argument.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
BaseStep |
The step to be executed. |
required |
*args |
Any |
Additional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
List[str] |
A list of arguments for the entrypoint. |
Source code in zenml/integrations/gcp/orchestrators/vertex_entrypoint_configuration.py
@classmethod
def get_custom_entrypoint_arguments(
cls, step: "BaseStep", *args: Any, **kwargs: Any
) -> List[str]:
"""Sets the value for the `VERTEX_JOB_ID_OPTION` argument.
Args:
step: The step to be executed.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
Returns:
A list of arguments for the entrypoint.
"""
return [f"--{VERTEX_JOB_ID_OPTION}", kwargs[VERTEX_JOB_ID_OPTION]]
get_custom_entrypoint_options()
classmethod
Vertex AI Pipelines specific entrypoint options.
The argument VERTEX_JOB_ID_OPTION
allows to specify the job id of the
Vertex AI Pipeline and get it in the execution of the step, via the get_run_name
method.
Returns:
Type | Description |
---|---|
Set[str] |
The set of custom entrypoint options. |
Source code in zenml/integrations/gcp/orchestrators/vertex_entrypoint_configuration.py
@classmethod
def get_custom_entrypoint_options(cls) -> Set[str]:
"""Vertex AI Pipelines specific entrypoint options.
The argument `VERTEX_JOB_ID_OPTION` allows to specify the job id of the
Vertex AI Pipeline and get it in the execution of the step, via the `get_run_name`
method.
Returns:
The set of custom entrypoint options.
"""
return {VERTEX_JOB_ID_OPTION}
get_run_name(self, pipeline_name)
Returns the Vertex AI Pipeline job id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
The name of the pipeline. |
required |
Returns:
Type | Description |
---|---|
str |
The Vertex AI Pipeline job id. |
Source code in zenml/integrations/gcp/orchestrators/vertex_entrypoint_configuration.py
def get_run_name(self, pipeline_name: str) -> str:
"""Returns the Vertex AI Pipeline job id.
Args:
pipeline_name: The name of the pipeline.
Returns:
The Vertex AI Pipeline job id.
"""
job_id: str = self.entrypoint_args[VERTEX_JOB_ID_OPTION]
return job_id
vertex_orchestrator
Implementation of the VertexAI orchestrator.
VertexOrchestrator (BaseOrchestrator, GoogleCredentialsMixin)
pydantic-model
Orchestrator responsible for running pipelines on Vertex AI.
Attributes:
Name | Type | Description |
---|---|---|
custom_docker_base_image_name |
Optional[str] |
Name of the Docker image that should be used as the base for the image that will be used to execute each of the steps. If no custom base image is given, a basic image of the active ZenML version will be used. Note: This image needs to have ZenML installed, otherwise the pipeline execution will fail. For that reason, you might want to extend the ZenML Docker images found here: https://hub.docker.com/r/zenmldocker/zenml/ |
project |
Optional[str] |
GCP project name. If |
location |
str |
Name of GCP region where the pipeline job will be executed. Vertex AI Pipelines is available in the following regions: https://cloud.google.com/vertex-ai/docs/general/locations#feature -availability |
pipeline_root |
Optional[str] |
a Cloud Storage URI that will be used by the Vertex AI |
encryption_spec_key_name |
Optional[str] |
The Cloud KMS resource identifier of the |
customer
managed |
encryption key used to protect the job. Has the form |
|
workload_service_account |
Optional[str] |
the service account for workload run-as account. Users submitting jobs must have act-as permission on this run-as account. If not provided, the default service account will be used. |
network |
Optional[str] |
the full name of the Compute Engine Network to which the job |
synchronous |
bool |
If |
Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
class VertexOrchestrator(BaseOrchestrator, GoogleCredentialsMixin):
"""Orchestrator responsible for running pipelines on Vertex AI.
Attributes:
custom_docker_base_image_name: Name of the Docker image that should be
used as the base for the image that will be used to execute each of
the steps. If no custom base image is given, a basic image of the
active ZenML version will be used. **Note**: This image needs to
have ZenML installed, otherwise the pipeline execution will fail.
For that reason, you might want to extend the ZenML Docker images found
here: https://hub.docker.com/r/zenmldocker/zenml/
project: GCP project name. If `None`, the project will be inferred from
the environment.
location: Name of GCP region where the pipeline job will be executed.
Vertex AI Pipelines is available in the following regions:
https://cloud.google.com/vertex-ai/docs/general/locations#feature
-availability
pipeline_root: a Cloud Storage URI that will be used by the Vertex AI
Pipelines.
If not provided but the artifact store in the stack used to execute
the pipeline is a
`zenml.integrations.gcp.artifact_stores.GCPArtifactStore`,
then a subdirectory of the artifact store will be used.
encryption_spec_key_name: The Cloud KMS resource identifier of the
customer
managed encryption key used to protect the job. Has the form:
`projects/<PRJCT>/locations/<REGION>/keyRings/<KR>/cryptoKeys/<KEY>`
. The key needs to be in the same region as where the compute
resource is created.
workload_service_account: the service account for workload run-as
account. Users submitting jobs must have act-as permission on this
run-as account.
If not provided, the default service account will be used.
network: the full name of the Compute Engine Network to which the job
should
be peered. For example, `projects/12345/global/networks/myVPC`
If not provided, the job will not be peered with any network.
synchronous: If `True`, running a pipeline using this orchestrator will
block until all steps finished running on Vertex AI Pipelines
service.
"""
custom_docker_base_image_name: Optional[str] = None
project: Optional[str] = None
location: str
pipeline_root: Optional[str] = None
labels: Dict[str, str] = {}
encryption_spec_key_name: Optional[str] = None
workload_service_account: Optional[str] = None
network: Optional[str] = None
synchronous: bool = False
_pipeline_root: str
FLAVOR: ClassVar[str] = GCP_VERTEX_ORCHESTRATOR_FLAVOR
@property
def validator(self) -> Optional[StackValidator]:
"""Validates that the stack contains a container registry.
Also validates that the artifact store and metadata store used are not
local.
Returns:
A StackValidator instance.
"""
def _validate_stack_requirements(stack: "Stack") -> Tuple[bool, str]:
"""Validates that all the stack components are not local.
Args:
stack: The stack to validate.
Returns:
A tuple of (is_valid, error_message).
"""
# Validate that the container registry is not local.
container_registry = stack.container_registry
if container_registry and container_registry.is_local:
return False, (
f"The Vertex orchestrator does not support local "
f"container registries. You should replace the component '"
f"{container_registry.name}' "
f"{container_registry.TYPE.value} to a remote one."
)
# Validate that the rest of the components are not local.
for stack_comp in stack.components.values():
local_path = stack_comp.local_path
if not local_path:
continue
return False, (
f"The '{stack_comp.name}' {stack_comp.TYPE.value} is a "
f"local stack component. The Vertex AI Pipelines "
f"orchestrator requires that all the components in the "
f"stack used to execute the pipeline have to be not local, "
f"because there is no way for Vertex to connect to your "
f"local machine. You should use a flavor of "
f"{stack_comp.TYPE.value} other than '"
f"{stack_comp.FLAVOR}'."
)
# If the `pipeline_root` has not been defined in the orchestrator
# configuration, and the artifact store is not a GCP artifact store,
# then raise an error.
if (
not self.pipeline_root
and stack.artifact_store.FLAVOR != GCP_ARTIFACT_STORE_FLAVOR
):
return False, (
f"The attribute `pipeline_root` has not been set and it "
f"cannot be generated using the path of the artifact store "
f"because it is not a "
f"`zenml.integrations.gcp.artifact_store.GCPArtifactStore`."
f" To solve this issue, set the `pipeline_root` attribute "
f"manually executing the following command: "
f"`zenml orchestrator update {stack.orchestrator.name} "
f'--pipeline_root="<Cloud Storage URI>"`.'
)
return True, ""
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_validate_stack_requirements,
)
def get_docker_image_name(self, pipeline_name: str) -> str:
"""Returns the full docker image name including registry and tag.
Args:
pipeline_name: The name of the pipeline.
Returns:
The full docker image name including registry and tag.
"""
base_image_name = f"zenml-vertex:{pipeline_name}"
container_registry = Repository().active_stack.container_registry
if container_registry:
registry_uri = container_registry.uri.rstrip("/")
return f"{registry_uri}/{base_image_name}"
return base_image_name
@property
def root_directory(self) -> str:
"""Returns path to the root directory for files for this orchestrator.
Returns:
The path to the root directory for all files concerning this
orchestrator.
"""
return os.path.join(
get_global_config_directory(), "vertex", str(self.uuid)
)
@property
def pipeline_directory(self) -> str:
"""Returns path to directory where kubeflow pipelines files are stored.
Returns:
Path to the pipeline directory.
"""
return os.path.join(self.root_directory, "pipelines")
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Build a Docker image for the current environment.
This uploads it to a container registry if configured.
Args:
pipeline: The pipeline to be deployed.
stack: The stack that will be used to deploy the pipeline.
runtime_configuration: The runtime configuration for the pipeline.
Raises:
RuntimeError: If the container registry is missing.
"""
from zenml.utils import docker_utils
repo = Repository()
container_registry = repo.active_stack.container_registry
if not container_registry:
raise RuntimeError("Missing container registry")
image_name = self.get_docker_image_name(pipeline.name)
requirements = {*stack.requirements(), *pipeline.requirements}
logger.debug(
"Vertex AI Pipelines service docker container requirements %s",
requirements,
)
docker_utils.build_docker_image(
build_context_path=get_source_root_path(),
image_name=image_name,
dockerignore_path=pipeline.dockerignore_file,
requirements=requirements,
base_image=self.custom_docker_base_image_name,
)
container_registry.push_image(image_name)
def prepare_or_run_pipeline(
self,
sorted_steps: List["BaseStep"],
pipeline: "BasePipeline",
pb2_pipeline: "Pb2Pipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Creates a KFP JSON pipeline.
# noqa: DAR402
This is an intermediary representation of the pipeline which is then
deployed to Vertex AI Pipelines service.
How it works:
-------------
Before this method is called the `prepare_pipeline_deployment()` method
builds a Docker image that contains the code for the pipeline, all steps
the context around these files.
Based on this Docker image a callable is created which builds
container_ops for each step (`_construct_kfp_pipeline`). The function
`kfp.components.load_component_from_text` is used to create the
`ContainerOp`, because using the `dsl.ContainerOp` class directly is
deprecated when using the Kubeflow SDK v2. The step entrypoint command
with the entrypoint arguments is the command that will be executed by
the container created using the previously created Docker image.
This callable is then compiled into a JSON file that is used as the
intermediary representation of the Kubeflow pipeline.
This file then is submitted to the Vertex AI Pipelines service for
execution.
Args:
sorted_steps: List of sorted steps.
pipeline: Zenml Pipeline instance.
pb2_pipeline: Protobuf Pipeline instance.
stack: The stack the pipeline was run on.
runtime_configuration: The Runtime configuration of the current run.
Raises:
ValueError: If the attribute `pipeline_root` is not set and it
can be not generated using the path of the artifact store in the
stack because it is not a
`zenml.integrations.gcp.artifact_store.GCPArtifactStore`.
"""
# If the `pipeline_root` has not been defined in the orchestrator
# configuration,
# try to create it from the artifact store if it is a
# `GCPArtifactStore`.
if not self.pipeline_root:
artifact_store = stack.artifact_store
self._pipeline_root = f"{artifact_store.path.rstrip('/')}/vertex_pipeline_root/{pipeline.name}/{runtime_configuration.run_name}"
logger.info(
"The attribute `pipeline_root` has not been set in the "
"orchestrator configuration. One has been generated "
"automatically based on the path of the `GCPArtifactStore` "
"artifact store in the stack used to execute the pipeline. "
"The generated `pipeline_root` is `%s`.",
self._pipeline_root,
)
else:
self._pipeline_root = self.pipeline_root
# Build the Docker image that will be used to run the steps of the
# pipeline.
image_name = self.get_docker_image_name(pipeline.name)
image_name = get_image_digest(image_name) or image_name
def _construct_kfp_pipeline() -> None:
"""Create a `ContainerOp` for each step.
This should contain the name of the Docker image and configures the
entrypoint of the Docker image to run the step.
Additionally, this gives each `ContainerOp` information about its
direct downstream steps.
If this callable is passed to the `compile()` method of
`KFPV2Compiler` all `dsl.ContainerOp` instances will be
automatically added to a singular `dsl.Pipeline` instance.
"""
step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}
for step in sorted_steps:
# The command will be needed to eventually call the python step
# within the docker container
command = VertexEntrypointConfiguration.get_entrypoint_command()
# The arguments are passed to configure the entrypoint of the
# docker container when the step is called.
arguments = VertexEntrypointConfiguration.get_entrypoint_arguments(
step=step,
pb2_pipeline=pb2_pipeline,
**{VERTEX_JOB_ID_OPTION: dslv2.PIPELINE_JOB_ID_PLACEHOLDER},
)
# Create the `ContainerOp` for the step. Using the
# `dsl.ContainerOp`
# class directly is deprecated when using the Kubeflow SDK v2.
container_op = kfp.components.load_component_from_text(
f"""
name: {step.name}
implementation:
container:
image: {image_name}
command: {command + arguments}"""
)()
# Set upstream tasks as a dependency of the current step
upstream_step_names = self.get_upstream_step_names(
step=step, pb2_pipeline=pb2_pipeline
)
for upstream_step_name in upstream_step_names:
upstream_container_op = step_name_to_container_op[
upstream_step_name
]
container_op.after(upstream_container_op)
step_name_to_container_op[step.name] = container_op
# Save the generated pipeline to a file.
assert runtime_configuration.run_name
fileio.makedirs(self.pipeline_directory)
pipeline_file_path = os.path.join(
self.pipeline_directory,
f"{runtime_configuration.run_name}.json",
)
# Compile the pipeline using the Kubeflow SDK V2 compiler that allows
# to generate a JSON representation of the pipeline that can be later
# upload to Vertex AI Pipelines service.
logger.debug(
"Compiling pipeline using Kubeflow SDK V2 compiler and saving it "
"to `%s`",
pipeline_file_path,
)
KFPV2Compiler().compile(
pipeline_func=_construct_kfp_pipeline,
package_path=pipeline_file_path,
pipeline_name=_clean_pipeline_name(pipeline.name),
)
# Using the Google Cloud AIPlatform client, upload and execute the
# pipeline
# on the Vertex AI Pipelines service.
self._upload_and_run_pipeline(
pipeline_name=pipeline.name,
pipeline_file_path=pipeline_file_path,
runtime_configuration=runtime_configuration,
enable_cache=pipeline.enable_cache,
)
def _upload_and_run_pipeline(
self,
pipeline_name: str,
pipeline_file_path: str,
runtime_configuration: "RuntimeConfiguration",
enable_cache: bool,
) -> None:
"""Uploads and run the pipeline on the Vertex AI Pipelines service.
Args:
pipeline_name: Name of the pipeline.
pipeline_file_path: Path of the JSON file containing the compiled
Kubeflow pipeline (compiled with Kubeflow SDK v2).
runtime_configuration: Runtime configuration of the pipeline run.
enable_cache: Whether caching is enabled for this pipeline run.
"""
# We have to replace the hyphens in the pipeline name with underscores
# and lower case the string, because the Vertex AI Pipelines service
# requires this format.
assert runtime_configuration.run_name
job_id = _clean_pipeline_name(runtime_configuration.run_name)
# Warn the user that the scheduling is not available using the Vertex
# Orchestrator
if runtime_configuration.schedule:
logger.warning(
"Pipeline scheduling configuration was provided, but Vertex "
"AI Pipelines "
"do not have capabilities for scheduling yet."
)
# Get the credentials that would be used to create the Vertex AI
# Pipelines
# job.
credentials, project_id = self._get_authentication()
if self.project and self.project != project_id:
logger.warning(
"Authenticated with project `%s`, but this orchestrator is "
"configured to use the project `%s`.",
project_id,
self.project,
)
# If the project was set in the configuration, use it. Otherwise, use
# the project that was used to authenticate.
project_id = self.project if self.project else project_id
# Instantiate the Vertex AI Pipelines job
run = aiplatform.PipelineJob(
display_name=pipeline_name,
template_path=pipeline_file_path,
job_id=job_id,
pipeline_root=self._pipeline_root,
parameter_values=None,
enable_caching=enable_cache,
encryption_spec_key_name=self.encryption_spec_key_name,
labels=self.labels,
credentials=credentials,
project=self.project,
location=self.location,
)
logger.info(
"Submitting pipeline job with job_id `%s` to Vertex AI Pipelines "
"service.",
job_id,
)
# Submit the job to Vertex AI Pipelines service.
try:
if self.workload_service_account:
logger.info(
"The Vertex AI Pipelines job workload will be executed "
"using `%s` "
"service account.",
self.workload_service_account,
)
if self.network:
logger.info(
"The Vertex AI Pipelines job will be peered with `%s` "
"network.",
self.network,
)
run.submit(
service_account=self.workload_service_account,
network=self.network,
)
logger.info(
"View the Vertex AI Pipelines job at %s", run._dashboard_uri()
)
if self.synchronous:
logger.info(
"Waiting for the Vertex AI Pipelines job to finish..."
)
run.wait()
except google_exceptions.ClientError as e:
logger.warning(
"Failed to create the Vertex AI Pipelines job: %s", e
)
except RuntimeError as e:
logger.error(
"The Vertex AI Pipelines job execution has failed: %s", e
)
pipeline_directory: str
property
readonly
Returns path to directory where kubeflow pipelines files are stored.
Returns:
Type | Description |
---|---|
str |
Path to the pipeline directory. |
root_directory: str
property
readonly
Returns path to the root directory for files for this orchestrator.
Returns:
Type | Description |
---|---|
str |
The path to the root directory for all files concerning this orchestrator. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates that the stack contains a container registry.
Also validates that the artifact store and metadata store used are not local.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A StackValidator instance. |
get_docker_image_name(self, pipeline_name)
Returns the full docker image name including registry and tag.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
The name of the pipeline. |
required |
Returns:
Type | Description |
---|---|
str |
The full docker image name including registry and tag. |
Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
def get_docker_image_name(self, pipeline_name: str) -> str:
"""Returns the full docker image name including registry and tag.
Args:
pipeline_name: The name of the pipeline.
Returns:
The full docker image name including registry and tag.
"""
base_image_name = f"zenml-vertex:{pipeline_name}"
container_registry = Repository().active_stack.container_registry
if container_registry:
registry_uri = container_registry.uri.rstrip("/")
return f"{registry_uri}/{base_image_name}"
return base_image_name
prepare_or_run_pipeline(self, sorted_steps, pipeline, pb2_pipeline, stack, runtime_configuration)
Creates a KFP JSON pipeline.
noqa: DAR402
This is an intermediary representation of the pipeline which is then deployed to Vertex AI Pipelines service.
How it works:
Before this method is called the prepare_pipeline_deployment()
method
builds a Docker image that contains the code for the pipeline, all steps
the context around these files.
Based on this Docker image a callable is created which builds
container_ops for each step (_construct_kfp_pipeline
). The function
kfp.components.load_component_from_text
is used to create the
ContainerOp
, because using the dsl.ContainerOp
class directly is
deprecated when using the Kubeflow SDK v2. The step entrypoint command
with the entrypoint arguments is the command that will be executed by
the container created using the previously created Docker image.
This callable is then compiled into a JSON file that is used as the intermediary representation of the Kubeflow pipeline.
This file then is submitted to the Vertex AI Pipelines service for execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sorted_steps |
List[BaseStep] |
List of sorted steps. |
required |
pipeline |
BasePipeline |
Zenml Pipeline instance. |
required |
pb2_pipeline |
Pb2Pipeline |
Protobuf Pipeline instance. |
required |
stack |
Stack |
The stack the pipeline was run on. |
required |
runtime_configuration |
RuntimeConfiguration |
The Runtime configuration of the current run. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If the attribute |
Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
def prepare_or_run_pipeline(
self,
sorted_steps: List["BaseStep"],
pipeline: "BasePipeline",
pb2_pipeline: "Pb2Pipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Creates a KFP JSON pipeline.
# noqa: DAR402
This is an intermediary representation of the pipeline which is then
deployed to Vertex AI Pipelines service.
How it works:
-------------
Before this method is called the `prepare_pipeline_deployment()` method
builds a Docker image that contains the code for the pipeline, all steps
the context around these files.
Based on this Docker image a callable is created which builds
container_ops for each step (`_construct_kfp_pipeline`). The function
`kfp.components.load_component_from_text` is used to create the
`ContainerOp`, because using the `dsl.ContainerOp` class directly is
deprecated when using the Kubeflow SDK v2. The step entrypoint command
with the entrypoint arguments is the command that will be executed by
the container created using the previously created Docker image.
This callable is then compiled into a JSON file that is used as the
intermediary representation of the Kubeflow pipeline.
This file then is submitted to the Vertex AI Pipelines service for
execution.
Args:
sorted_steps: List of sorted steps.
pipeline: Zenml Pipeline instance.
pb2_pipeline: Protobuf Pipeline instance.
stack: The stack the pipeline was run on.
runtime_configuration: The Runtime configuration of the current run.
Raises:
ValueError: If the attribute `pipeline_root` is not set and it
can be not generated using the path of the artifact store in the
stack because it is not a
`zenml.integrations.gcp.artifact_store.GCPArtifactStore`.
"""
# If the `pipeline_root` has not been defined in the orchestrator
# configuration,
# try to create it from the artifact store if it is a
# `GCPArtifactStore`.
if not self.pipeline_root:
artifact_store = stack.artifact_store
self._pipeline_root = f"{artifact_store.path.rstrip('/')}/vertex_pipeline_root/{pipeline.name}/{runtime_configuration.run_name}"
logger.info(
"The attribute `pipeline_root` has not been set in the "
"orchestrator configuration. One has been generated "
"automatically based on the path of the `GCPArtifactStore` "
"artifact store in the stack used to execute the pipeline. "
"The generated `pipeline_root` is `%s`.",
self._pipeline_root,
)
else:
self._pipeline_root = self.pipeline_root
# Build the Docker image that will be used to run the steps of the
# pipeline.
image_name = self.get_docker_image_name(pipeline.name)
image_name = get_image_digest(image_name) or image_name
def _construct_kfp_pipeline() -> None:
"""Create a `ContainerOp` for each step.
This should contain the name of the Docker image and configures the
entrypoint of the Docker image to run the step.
Additionally, this gives each `ContainerOp` information about its
direct downstream steps.
If this callable is passed to the `compile()` method of
`KFPV2Compiler` all `dsl.ContainerOp` instances will be
automatically added to a singular `dsl.Pipeline` instance.
"""
step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}
for step in sorted_steps:
# The command will be needed to eventually call the python step
# within the docker container
command = VertexEntrypointConfiguration.get_entrypoint_command()
# The arguments are passed to configure the entrypoint of the
# docker container when the step is called.
arguments = VertexEntrypointConfiguration.get_entrypoint_arguments(
step=step,
pb2_pipeline=pb2_pipeline,
**{VERTEX_JOB_ID_OPTION: dslv2.PIPELINE_JOB_ID_PLACEHOLDER},
)
# Create the `ContainerOp` for the step. Using the
# `dsl.ContainerOp`
# class directly is deprecated when using the Kubeflow SDK v2.
container_op = kfp.components.load_component_from_text(
f"""
name: {step.name}
implementation:
container:
image: {image_name}
command: {command + arguments}"""
)()
# Set upstream tasks as a dependency of the current step
upstream_step_names = self.get_upstream_step_names(
step=step, pb2_pipeline=pb2_pipeline
)
for upstream_step_name in upstream_step_names:
upstream_container_op = step_name_to_container_op[
upstream_step_name
]
container_op.after(upstream_container_op)
step_name_to_container_op[step.name] = container_op
# Save the generated pipeline to a file.
assert runtime_configuration.run_name
fileio.makedirs(self.pipeline_directory)
pipeline_file_path = os.path.join(
self.pipeline_directory,
f"{runtime_configuration.run_name}.json",
)
# Compile the pipeline using the Kubeflow SDK V2 compiler that allows
# to generate a JSON representation of the pipeline that can be later
# upload to Vertex AI Pipelines service.
logger.debug(
"Compiling pipeline using Kubeflow SDK V2 compiler and saving it "
"to `%s`",
pipeline_file_path,
)
KFPV2Compiler().compile(
pipeline_func=_construct_kfp_pipeline,
package_path=pipeline_file_path,
pipeline_name=_clean_pipeline_name(pipeline.name),
)
# Using the Google Cloud AIPlatform client, upload and execute the
# pipeline
# on the Vertex AI Pipelines service.
self._upload_and_run_pipeline(
pipeline_name=pipeline.name,
pipeline_file_path=pipeline_file_path,
runtime_configuration=runtime_configuration,
enable_cache=pipeline.enable_cache,
)
prepare_pipeline_deployment(self, pipeline, stack, runtime_configuration)
Build a Docker image for the current environment.
This uploads it to a container registry if configured.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
The pipeline to be deployed. |
required |
stack |
Stack |
The stack that will be used to deploy the pipeline. |
required |
runtime_configuration |
RuntimeConfiguration |
The runtime configuration for the pipeline. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the container registry is missing. |
Source code in zenml/integrations/gcp/orchestrators/vertex_orchestrator.py
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Build a Docker image for the current environment.
This uploads it to a container registry if configured.
Args:
pipeline: The pipeline to be deployed.
stack: The stack that will be used to deploy the pipeline.
runtime_configuration: The runtime configuration for the pipeline.
Raises:
RuntimeError: If the container registry is missing.
"""
from zenml.utils import docker_utils
repo = Repository()
container_registry = repo.active_stack.container_registry
if not container_registry:
raise RuntimeError("Missing container registry")
image_name = self.get_docker_image_name(pipeline.name)
requirements = {*stack.requirements(), *pipeline.requirements}
logger.debug(
"Vertex AI Pipelines service docker container requirements %s",
requirements,
)
docker_utils.build_docker_image(
build_context_path=get_source_root_path(),
image_name=image_name,
dockerignore_path=pipeline.dockerignore_file,
requirements=requirements,
base_image=self.custom_docker_base_image_name,
)
container_registry.push_image(image_name)
secrets_manager
special
ZenML integration for GCP Secrets Manager.
The GCP Secrets Manager allows your pipeline to directly access the GCP secrets manager and use the secrets within during runtime.
gcp_secrets_manager
Implementation of the GCP Secrets Manager.
GCPSecretsManager (BaseSecretsManager)
pydantic-model
Class to interact with the GCP secrets manager.
Attributes:
Name | Type | Description |
---|---|---|
project_id |
str |
This is necessary to access the correct GCP project. The project_id of your GCP project space that contains the Secret Manager. |
Source code in zenml/integrations/gcp/secrets_manager/gcp_secrets_manager.py
class GCPSecretsManager(BaseSecretsManager):
"""Class to interact with the GCP secrets manager.
Attributes:
project_id: This is necessary to access the correct GCP project.
The project_id of your GCP project space that contains
the Secret Manager.
"""
project_id: str
# Class configuration
FLAVOR: ClassVar[str] = GCP_SECRETS_MANAGER_FLAVOR
CLIENT: ClassVar[Any] = None
@classmethod
def _ensure_client_connected(cls) -> None:
if cls.CLIENT is None:
cls.CLIENT = secretmanager.SecretManagerServiceClient()
@property
def parent_name(self) -> str:
"""Construct the GCP parent path to the secret manager.
Returns:
The parent path to the secret manager
"""
return f"projects/{self.project_id}"
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: the secret to register
Raises:
SecretExistsError: if the secret already exists
"""
self._ensure_client_connected()
if secret.name in self.get_all_secret_keys():
raise SecretExistsError(
f"A Secret with the name {secret.name} already exists."
)
adjusted_content = prepend_group_name_to_keys(secret)
for k, v in adjusted_content.items():
# Create the secret, this only creates an empty secret with the
# supplied name.
gcp_secret = self.CLIENT.create_secret(
request={
"parent": self.parent_name,
"secret_id": k,
"secret": {
"replication": {"automatic": {}},
"labels": [
(ZENML_GROUP_KEY, secret.name),
(ZENML_SCHEMA_NAME, secret.TYPE),
],
},
}
)
logger.debug("Created empty secret: %s", gcp_secret.name)
self.CLIENT.add_secret_version(
request={
"parent": gcp_secret.name,
"payload": {"data": str(v).encode()},
}
)
logger.debug("Added value to secret.")
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Get a secret by its name.
Args:
secret_name: the name of the secret to get
Returns:
The secret.
Raises:
RuntimeError: if the secret does not exist
"""
self._ensure_client_connected()
secret_contents = {}
zenml_schema_name = ""
# List all secrets.
for secret in self.CLIENT.list_secrets(
request={"parent": self.parent_name}
):
if (
ZENML_GROUP_KEY in secret.labels
and secret_name == secret.labels[ZENML_GROUP_KEY]
):
secret_version_name = secret.name + "/versions/latest"
response = self.CLIENT.access_secret_version(
request={"name": secret_version_name}
)
secret_value = response.payload.data.decode("UTF-8")
secret_key = remove_group_name_from_key(
secret.name.split("/")[-1], secret_name
)
secret_contents[secret_key] = secret_value
zenml_schema_name = secret.labels[ZENML_SCHEMA_NAME]
if not secret_contents:
raise RuntimeError(f"No secrets found within the {secret_name}")
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**secret_contents)
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys
"""
self._ensure_client_connected()
set_of_secrets = set()
# List all secrets.
for secret in self.CLIENT.list_secrets(
request={"parent": self.parent_name}
):
if ZENML_GROUP_KEY in secret.labels:
group_key = secret.labels[ZENML_GROUP_KEY]
set_of_secrets.add(group_key)
return list(set_of_secrets)
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret by creating new versions of the existing secrets.
Args:
secret: the secret to update
"""
self._ensure_client_connected()
adjusted_content = prepend_group_name_to_keys(secret)
for k, v in adjusted_content.items():
# Create the secret, this only creates an empty secret with the
# supplied name.
version_parent = self.CLIENT.secret_path(self.project_id, k)
payload = {"data": str(v).encode()}
self.CLIENT.add_secret_version(
request={"parent": version_parent, "payload": payload}
)
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret by name.
In GCP a secret is a single k-v
pair. Within ZenML a secret is a collection of k-v pairs. As such,
deleting a secret will iterate through all secrets and delete the ones
with the secret_name as label.
Args:
secret_name: the name of the secret to delete
"""
self._ensure_client_connected()
# Go through all gcp secrets and delete the ones with the secret_name
# as label.
for secret in self.CLIENT.list_secrets(
request={"parent": self.parent_name}
):
if (
ZENML_GROUP_KEY in secret.labels
and secret_name == secret.labels[ZENML_GROUP_KEY]
):
self.CLIENT.delete_secret(request={"name": secret.name})
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
self._ensure_client_connected()
# List al