Integrations
zenml.integrations
special
The ZenML integrations module contains sub-modules for each integration that we
support. This includes orchestrators like Apache Airflow, visualization tools
like the facets
library, as well as deep learning libraries like PyTorch.
airflow
special
The Airflow integration sub-module powers an alternative to the local
orchestrator. You can enable it by registering the Airflow orchestrator with
the CLI tool, then bootstrap using the zenml orchestrator up
command.
AirflowIntegration (Integration)
Definition of Airflow Integration for ZenML.
Source code in zenml/integrations/airflow/__init__.py
class AirflowIntegration(Integration):
"""Definition of Airflow Integration for ZenML."""
NAME = AIRFLOW
REQUIREMENTS = ["apache-airflow==2.2.0"]
@classmethod
def activate(cls):
"""Activates all classes required for the airflow integration."""
from zenml.integrations.airflow import orchestrators # noqa
activate()
classmethod
Activates all classes required for the airflow integration.
Source code in zenml/integrations/airflow/__init__.py
@classmethod
def activate(cls):
"""Activates all classes required for the airflow integration."""
from zenml.integrations.airflow import orchestrators # noqa
orchestrators
special
The Airflow integration enables the use of Airflow as a pipeline orchestrator.
airflow_component
Definition for Airflow component for TFX.
AirflowComponent (PythonOperator)
Airflow-specific TFX Component. This class wrap a component run into its own PythonOperator in Airflow.
Source code in zenml/integrations/airflow/orchestrators/airflow_component.py
class AirflowComponent(python.PythonOperator):
"""Airflow-specific TFX Component.
This class wrap a component run into its own PythonOperator in Airflow.
"""
def __init__(
self,
*,
parent_dag: airflow.DAG,
pipeline_node: pipeline_pb2.PipelineNode,
mlmd_connection: metadata.Metadata,
pipeline_info: pipeline_pb2.PipelineInfo,
pipeline_runtime_spec: pipeline_pb2.PipelineRuntimeSpec,
executor_spec: Optional[message.Message] = None,
custom_driver_spec: Optional[message.Message] = None,
custom_executor_operators: Optional[
Dict[Any, Type[launcher.ExecutorOperator]]
] = None,
) -> None:
"""Constructs an Airflow implementation of TFX component.
Args:
parent_dag: The airflow DAG that this component is contained in.
pipeline_node: The specification of the node to launch.
mlmd_connection: ML metadata connection info.
pipeline_info: The information of the pipeline that this node
runs in.
pipeline_runtime_spec: The runtime information of the pipeline
that this node runs in.
executor_spec: Specification for the executor of the node.
custom_driver_spec: Specification for custom driver.
custom_executor_operators: Map of executable specs to executor
operators.
"""
launcher_callable = functools.partial(
_airflow_component_launcher,
pipeline_node=pipeline_node,
mlmd_connection=mlmd_connection,
pipeline_info=pipeline_info,
pipeline_runtime_spec=pipeline_runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
super().__init__(
task_id=pipeline_node.node_info.id,
provide_context=True,
python_callable=launcher_callable,
dag=parent_dag,
)
__init__(self, *, parent_dag, pipeline_node, mlmd_connection, pipeline_info, pipeline_runtime_spec, executor_spec=None, custom_driver_spec=None, custom_executor_operators=None)
special
Constructs an Airflow implementation of TFX component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parent_dag |
DAG |
The airflow DAG that this component is contained in. |
required |
pipeline_node |
PipelineNode |
The specification of the node to launch. |
required |
mlmd_connection |
Metadata |
ML metadata connection info. |
required |
pipeline_info |
PipelineInfo |
The information of the pipeline that this node runs in. |
required |
pipeline_runtime_spec |
PipelineRuntimeSpec |
The runtime information of the pipeline that this node runs in. |
required |
executor_spec |
Optional[google.protobuf.message.Message] |
Specification for the executor of the node. |
None |
custom_driver_spec |
Optional[google.protobuf.message.Message] |
Specification for custom driver. |
None |
custom_executor_operators |
Optional[Dict[Any, Type[~ExecutorOperator]]] |
Map of executable specs to executor operators. |
None |
Source code in zenml/integrations/airflow/orchestrators/airflow_component.py
def __init__(
self,
*,
parent_dag: airflow.DAG,
pipeline_node: pipeline_pb2.PipelineNode,
mlmd_connection: metadata.Metadata,
pipeline_info: pipeline_pb2.PipelineInfo,
pipeline_runtime_spec: pipeline_pb2.PipelineRuntimeSpec,
executor_spec: Optional[message.Message] = None,
custom_driver_spec: Optional[message.Message] = None,
custom_executor_operators: Optional[
Dict[Any, Type[launcher.ExecutorOperator]]
] = None,
) -> None:
"""Constructs an Airflow implementation of TFX component.
Args:
parent_dag: The airflow DAG that this component is contained in.
pipeline_node: The specification of the node to launch.
mlmd_connection: ML metadata connection info.
pipeline_info: The information of the pipeline that this node
runs in.
pipeline_runtime_spec: The runtime information of the pipeline
that this node runs in.
executor_spec: Specification for the executor of the node.
custom_driver_spec: Specification for custom driver.
custom_executor_operators: Map of executable specs to executor
operators.
"""
launcher_callable = functools.partial(
_airflow_component_launcher,
pipeline_node=pipeline_node,
mlmd_connection=mlmd_connection,
pipeline_info=pipeline_info,
pipeline_runtime_spec=pipeline_runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
super().__init__(
task_id=pipeline_node.node_info.id,
provide_context=True,
python_callable=launcher_callable,
dag=parent_dag,
)
airflow_dag_runner
Definition of Airflow TFX runner. This is an unmodified copy from the TFX source code (outside of superficial, stylistic changes)
AirflowDagRunner
Tfx runner on Airflow.
Source code in zenml/integrations/airflow/orchestrators/airflow_dag_runner.py
class AirflowDagRunner:
"""Tfx runner on Airflow."""
def __init__(
self,
config: Optional[Union[Dict[str, Any], AirflowPipelineConfig]] = None,
):
"""Creates an instance of AirflowDagRunner.
Args:
config: Optional Airflow pipeline config for customizing the
launching of each component.
"""
self._config = config or pipeline_config.PipelineConfig()
if isinstance(config, dict):
warnings.warn(
"Pass config as a dict type is going to deprecated in 0.1.16. "
"Use AirflowPipelineConfig type instead.",
PendingDeprecationWarning,
)
self._config = AirflowPipelineConfig(airflow_dag_config=config)
@property
def config(self) -> pipeline_config.PipelineConfig:
return self._config
def run(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> "airflow.DAG":
"""Deploys given logical pipeline on Airflow.
Args:
pipeline: Logical pipeline containing pipeline args and comps.
stack: The current stack that ZenML is running on
runtime_configuration: The configuration of the run
Returns:
An Airflow DAG.
"""
# Only import these when needed.
import airflow # noqa
from zenml.integrations.airflow.orchestrators import airflow_component
# Merge airflow-specific configs with pipeline args
tfx_pipeline = create_tfx_pipeline(pipeline, stack=stack)
if runtime_configuration.schedule:
catchup = runtime_configuration.schedule.catchup
else:
catchup = False
airflow_dag = airflow.DAG(
dag_id=tfx_pipeline.pipeline_info.pipeline_name,
**(
typing.cast(
AirflowPipelineConfig, self._config
).airflow_dag_config
),
is_paused_upon_creation=False,
catchup=catchup,
)
pipeline_root = tfx_pipeline.pipeline_info.pipeline_root
if "tmp_dir" not in tfx_pipeline.additional_pipeline_args:
tmp_dir = os.path.join(pipeline_root, ".temp", "")
tfx_pipeline.additional_pipeline_args["tmp_dir"] = tmp_dir
for component in tfx_pipeline.components:
if isinstance(component, base_component.BaseComponent):
component._resolve_pip_dependencies(pipeline_root)
self._replace_runtime_params(component)
pb2_pipeline: Pb2Pipeline = compiler.Compiler().compile(tfx_pipeline)
# Substitute the runtime parameter to be a concrete run_id
runtime_parameter_utils.substitute_runtime_parameter(
pb2_pipeline,
{
"pipeline-run-id": runtime_configuration.run_name,
},
)
deployment_config = runner_utils.extract_local_deployment_config(
pb2_pipeline
)
connection_config = (
Repository().active_stack.metadata_store.get_tfx_metadata_config()
)
component_impl_map = {}
for node in pb2_pipeline.nodes:
pipeline_node: PipelineNode = node.pipeline_node # type: ignore[valid-type]
# Add the stack as context to each pipeline node:
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.STACK.value,
name=str(hash(json.dumps(stack.dict(), sort_keys=True))),
properties=stack.dict(),
)
# Add all pydantic objects from runtime_configuration to the context
context_utils.add_runtime_configuration_to_node(
pipeline_node, runtime_configuration
)
# Add pipeline requirements as a context
requirements = " ".join(sorted(pipeline.requirements))
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.PIPELINE_REQUIREMENTS.value,
name=str(hash(requirements)),
properties={"pipeline_requirements": requirements},
)
node_id = pipeline_node.node_info.id
executor_spec = runner_utils.extract_executor_spec(
deployment_config, node_id
)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, node_id
)
step = get_step_for_node(
pipeline_node, steps=list(pipeline.steps.values())
)
custom_executor_operators = {
executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
}
current_airflow_component = airflow_component.AirflowComponent(
parent_dag=airflow_dag,
pipeline_node=pipeline_node,
mlmd_connection=connection_config,
pipeline_info=pb2_pipeline.pipeline_info,
pipeline_runtime_spec=pb2_pipeline.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
component_impl_map[node_id] = current_airflow_component
for upstream_node in node.pipeline_node.upstream_nodes:
assert (
upstream_node in component_impl_map
), "Components is not in topological order"
current_airflow_component.set_upstream(
component_impl_map[upstream_node]
)
return airflow_dag
def _replace_runtime_params(
self, comp: base_node.BaseNode
) -> base_node.BaseNode:
"""Replaces runtime params for dynamic Airflow parameter execution.
Args:
comp: TFX component to be parsed.
Returns:
Returns edited component.
"""
for k, prop in comp.exec_properties.copy().items():
if isinstance(prop, RuntimeParameter):
# Airflow only supports string parameters.
if prop.ptype != str:
raise RuntimeError(
f"RuntimeParameter in Airflow does not support "
f"{prop.ptype}. The only ptype supported is string."
)
# If the default is a template, drop the template markers
# when inserting it into the .get() default argument below.
# Otherwise, provide the default as a quoted string.
default = cast(str, prop.default)
if default.startswith("{{") and default.endswith("}}"):
default = default[2:-2]
else:
default = json.dumps(default)
template_field = '{{ dag_run.conf.get("%s", %s) }}' % (
prop.name,
default,
)
comp.exec_properties[k] = template_field
return comp
__init__(self, config=None)
special
Creates an instance of AirflowDagRunner.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Union[Dict[str, Any], zenml.integrations.airflow.orchestrators.airflow_dag_runner.AirflowPipelineConfig] |
Optional Airflow pipeline config for customizing the launching of each component. |
None |
Source code in zenml/integrations/airflow/orchestrators/airflow_dag_runner.py
def __init__(
self,
config: Optional[Union[Dict[str, Any], AirflowPipelineConfig]] = None,
):
"""Creates an instance of AirflowDagRunner.
Args:
config: Optional Airflow pipeline config for customizing the
launching of each component.
"""
self._config = config or pipeline_config.PipelineConfig()
if isinstance(config, dict):
warnings.warn(
"Pass config as a dict type is going to deprecated in 0.1.16. "
"Use AirflowPipelineConfig type instead.",
PendingDeprecationWarning,
)
self._config = AirflowPipelineConfig(airflow_dag_config=config)
run(self, pipeline, stack, runtime_configuration)
Deploys given logical pipeline on Airflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
Logical pipeline containing pipeline args and comps. |
required |
stack |
Stack |
The current stack that ZenML is running on |
required |
runtime_configuration |
RuntimeConfiguration |
The configuration of the run |
required |
Returns:
Type | Description |
---|---|
airflow.DAG |
An Airflow DAG. |
Source code in zenml/integrations/airflow/orchestrators/airflow_dag_runner.py
def run(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> "airflow.DAG":
"""Deploys given logical pipeline on Airflow.
Args:
pipeline: Logical pipeline containing pipeline args and comps.
stack: The current stack that ZenML is running on
runtime_configuration: The configuration of the run
Returns:
An Airflow DAG.
"""
# Only import these when needed.
import airflow # noqa
from zenml.integrations.airflow.orchestrators import airflow_component
# Merge airflow-specific configs with pipeline args
tfx_pipeline = create_tfx_pipeline(pipeline, stack=stack)
if runtime_configuration.schedule:
catchup = runtime_configuration.schedule.catchup
else:
catchup = False
airflow_dag = airflow.DAG(
dag_id=tfx_pipeline.pipeline_info.pipeline_name,
**(
typing.cast(
AirflowPipelineConfig, self._config
).airflow_dag_config
),
is_paused_upon_creation=False,
catchup=catchup,
)
pipeline_root = tfx_pipeline.pipeline_info.pipeline_root
if "tmp_dir" not in tfx_pipeline.additional_pipeline_args:
tmp_dir = os.path.join(pipeline_root, ".temp", "")
tfx_pipeline.additional_pipeline_args["tmp_dir"] = tmp_dir
for component in tfx_pipeline.components:
if isinstance(component, base_component.BaseComponent):
component._resolve_pip_dependencies(pipeline_root)
self._replace_runtime_params(component)
pb2_pipeline: Pb2Pipeline = compiler.Compiler().compile(tfx_pipeline)
# Substitute the runtime parameter to be a concrete run_id
runtime_parameter_utils.substitute_runtime_parameter(
pb2_pipeline,
{
"pipeline-run-id": runtime_configuration.run_name,
},
)
deployment_config = runner_utils.extract_local_deployment_config(
pb2_pipeline
)
connection_config = (
Repository().active_stack.metadata_store.get_tfx_metadata_config()
)
component_impl_map = {}
for node in pb2_pipeline.nodes:
pipeline_node: PipelineNode = node.pipeline_node # type: ignore[valid-type]
# Add the stack as context to each pipeline node:
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.STACK.value,
name=str(hash(json.dumps(stack.dict(), sort_keys=True))),
properties=stack.dict(),
)
# Add all pydantic objects from runtime_configuration to the context
context_utils.add_runtime_configuration_to_node(
pipeline_node, runtime_configuration
)
# Add pipeline requirements as a context
requirements = " ".join(sorted(pipeline.requirements))
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.PIPELINE_REQUIREMENTS.value,
name=str(hash(requirements)),
properties={"pipeline_requirements": requirements},
)
node_id = pipeline_node.node_info.id
executor_spec = runner_utils.extract_executor_spec(
deployment_config, node_id
)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, node_id
)
step = get_step_for_node(
pipeline_node, steps=list(pipeline.steps.values())
)
custom_executor_operators = {
executable_spec_pb2.PythonClassExecutableSpec: step.executor_operator
}
current_airflow_component = airflow_component.AirflowComponent(
parent_dag=airflow_dag,
pipeline_node=pipeline_node,
mlmd_connection=connection_config,
pipeline_info=pb2_pipeline.pipeline_info,
pipeline_runtime_spec=pb2_pipeline.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
component_impl_map[node_id] = current_airflow_component
for upstream_node in node.pipeline_node.upstream_nodes:
assert (
upstream_node in component_impl_map
), "Components is not in topological order"
current_airflow_component.set_upstream(
component_impl_map[upstream_node]
)
return airflow_dag
AirflowPipelineConfig (PipelineConfig)
Pipeline config for AirflowDagRunner.
Source code in zenml/integrations/airflow/orchestrators/airflow_dag_runner.py
class AirflowPipelineConfig(pipeline_config.PipelineConfig):
"""Pipeline config for AirflowDagRunner."""
def __init__(
self,
airflow_dag_config: Optional[Dict[str, Any]] = None,
**kwargs: Any,
):
"""Creates an instance of AirflowPipelineConfig.
Args:
airflow_dag_config: Configs of Airflow DAG model. See
https://airflow.apache.org/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
for the full spec.
**kwargs: keyword args for PipelineConfig.
"""
super().__init__(**kwargs)
self.airflow_dag_config = airflow_dag_config or {}
__init__(self, airflow_dag_config=None, **kwargs)
special
Creates an instance of AirflowPipelineConfig.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
airflow_dag_config |
Optional[Dict[str, Any]] |
Configs of Airflow DAG model. See https://airflow.apache.org/_api/airflow/models/dag/index.html#airflow.models.dag.DAG for the full spec. |
None |
**kwargs |
Any |
keyword args for PipelineConfig. |
{} |
Source code in zenml/integrations/airflow/orchestrators/airflow_dag_runner.py
def __init__(
self,
airflow_dag_config: Optional[Dict[str, Any]] = None,
**kwargs: Any,
):
"""Creates an instance of AirflowPipelineConfig.
Args:
airflow_dag_config: Configs of Airflow DAG model. See
https://airflow.apache.org/_api/airflow/models/dag/index.html#airflow.models.dag.DAG
for the full spec.
**kwargs: keyword args for PipelineConfig.
"""
super().__init__(**kwargs)
self.airflow_dag_config = airflow_dag_config or {}
airflow_orchestrator
AirflowOrchestrator (BaseOrchestrator)
pydantic-model
Orchestrator responsible for running pipelines using Airflow.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class AirflowOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines using Airflow."""
airflow_home: str = ""
# Class Configuration
FLAVOR: ClassVar[str] = "airflow"
def __init__(self, **values: Any):
"""Sets environment variables to configure airflow."""
super().__init__(**values)
self._set_env()
@root_validator
def set_airflow_home(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Sets airflow home according to orchestrator UUID."""
if "uuid" not in values:
raise ValueError("`uuid` needs to exist for AirflowOrchestrator.")
values["airflow_home"] = os.path.join(
zenml.io.utils.get_global_config_directory(),
AIRFLOW_ROOT_DIR,
str(values["uuid"]),
)
return values
@property
def dags_directory(self) -> str:
"""Returns path to the airflow dags directory."""
return os.path.join(self.airflow_home, "dags")
@property
def pid_file(self) -> str:
"""Returns path to the daemon PID file."""
return os.path.join(self.airflow_home, "airflow_daemon.pid")
@property
def log_file(self) -> str:
"""Returns path to the airflow log file."""
return os.path.join(self.airflow_home, "airflow_orchestrator.log")
@property
def password_file(self) -> str:
"""Returns path to the webserver password file."""
return os.path.join(self.airflow_home, "standalone_admin_password.txt")
def _set_env(self) -> None:
"""Sets environment variables to configure airflow."""
os.environ["AIRFLOW_HOME"] = self.airflow_home
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = self.dags_directory
os.environ["AIRFLOW__CORE__DAG_DISCOVERY_SAFE_MODE"] = "false"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "false"
# check the DAG folder every 10 seconds for new files
os.environ["AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL"] = "10"
def _copy_to_dag_directory_if_necessary(self, dag_filepath: str) -> None:
"""Copies the DAG module to the airflow DAGs directory if it's not
already located there.
Args:
dag_filepath: Path to the file in which the DAG is defined.
"""
dags_directory = zenml.io.utils.resolve_relative_path(
self.dags_directory
)
if dags_directory == os.path.dirname(dag_filepath):
logger.debug("File is already in airflow DAGs directory.")
else:
logger.debug(
"Copying dag file '%s' to DAGs directory.", dag_filepath
)
destination_path = os.path.join(
dags_directory, os.path.basename(dag_filepath)
)
if fileio.exists(destination_path):
logger.info(
"File '%s' already exists, overwriting with new DAG file",
destination_path,
)
fileio.copy(dag_filepath, destination_path, overwrite=True)
def _log_webserver_credentials(self) -> None:
"""Logs URL and credentials to log in to the airflow webserver.
Raises:
FileNotFoundError: If the password file does not exist.
"""
if fileio.exists(self.password_file):
with open(self.password_file) as file:
password = file.read().strip()
else:
raise FileNotFoundError(
f"Can't find password file '{self.password_file}'"
)
logger.info(
"To inspect your DAGs, login to http://0.0.0.0:8080 "
"with username: admin password: %s",
password,
)
def runtime_options(self) -> Dict[str, Any]:
"""Runtime options for the airflow orchestrator."""
return {DAG_FILEPATH_OPTION_KEY: None}
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Checks whether airflow is running and copies the DAG file to the
airflow DAGs directory.
Raises:
RuntimeError: If airflow is not running or no DAG filepath runtime
option is provided.
"""
if not self.is_running:
raise RuntimeError(
"Airflow orchestrator is currently not running. Run `zenml "
"stack up` to provision resources for the active stack."
)
try:
dag_filepath = runtime_configuration[DAG_FILEPATH_OPTION_KEY]
except KeyError:
raise RuntimeError(
f"No DAG filepath found in runtime configuration. Make sure "
f"to add the filepath to your airflow DAG file as a runtime "
f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
)
self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)
@property
def is_running(self) -> bool:
"""Returns whether the airflow daemon is currently running."""
from airflow.cli.commands.standalone_command import StandaloneCommand
from airflow.jobs.triggerer_job import TriggererJob
daemon_running = daemon.check_if_daemon_is_running(self.pid_file)
command = StandaloneCommand()
webserver_port_open = command.port_open(8080)
if not daemon_running:
if webserver_port_open:
raise RuntimeError(
"The airflow daemon does not seem to be running but "
"local port 8080 is occupied. Make sure the port is "
"available and try again."
)
# exit early so we don't check non-existing airflow databases
return False
# we can't use StandaloneCommand().is_ready() here as the
# Airflow SequentialExecutor apparently does not send a heartbeat
# while running a task which would result in this returning `False`
# even if Airflow is running.
airflow_running = webserver_port_open and command.job_running(
TriggererJob
)
return airflow_running
@property
def is_provisioned(self) -> bool:
"""Returns whether the airflow daemon is currently running."""
return self.is_running
def provision(self) -> None:
"""Ensures that Airflow is running."""
if self.is_running:
logger.info("Airflow is already running.")
self._log_webserver_credentials()
return
if not fileio.exists(self.dags_directory):
zenml.io.utils.create_dir_recursive_if_not_exists(
self.dags_directory
)
from airflow.cli.commands.standalone_command import StandaloneCommand
try:
command = StandaloneCommand()
# Run the daemon with a working directory inside the current
# zenml repo so the same repo will be used to run the DAGs
daemon.run_as_daemon(
command.run,
pid_file=self.pid_file,
log_file=self.log_file,
working_directory=str(Repository().root),
)
while not self.is_running:
# Wait until the daemon started all the relevant airflow
# processes
time.sleep(0.1)
self._log_webserver_credentials()
except Exception as e:
logger.error(e)
logger.error(
"An error occurred while starting the Airflow daemon. If you "
"want to start it manually, use the commands described in the "
"official Airflow quickstart guide for running Airflow locally."
)
self.deprovision()
def deprovision(self) -> None:
"""Stops the airflow daemon if necessary and tears down resources."""
if self.is_running:
daemon.stop_daemon(self.pid_file)
fileio.rmtree(self.airflow_home)
logger.info("Airflow spun down.")
def run_pipeline(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Schedules a pipeline to be run on Airflow.
Returns:
An Airflow DAG object that corresponds to the ZenML pipeline.
"""
if runtime_configuration.schedule:
airflow_config = {
"schedule_interval": datetime.timedelta(
seconds=runtime_configuration.schedule.interval_second
),
"start_date": runtime_configuration.schedule.start_time,
"end_date": runtime_configuration.schedule.end_time,
}
else:
airflow_config = {
"schedule_interval": "@once",
# Scheduled in the past to make sure it runs immediately
"start_date": datetime.datetime.now() - datetime.timedelta(7),
}
runner = AirflowDagRunner(AirflowPipelineConfig(airflow_config))
return runner.run(
pipeline=pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
dags_directory: str
property
readonly
Returns path to the airflow dags directory.
is_provisioned: bool
property
readonly
Returns whether the airflow daemon is currently running.
is_running: bool
property
readonly
Returns whether the airflow daemon is currently running.
log_file: str
property
readonly
Returns path to the airflow log file.
password_file: str
property
readonly
Returns path to the webserver password file.
pid_file: str
property
readonly
Returns path to the daemon PID file.
__init__(self, **values)
special
Sets environment variables to configure airflow.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __init__(self, **values: Any):
"""Sets environment variables to configure airflow."""
super().__init__(**values)
self._set_env()
deprovision(self)
Stops the airflow daemon if necessary and tears down resources.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def deprovision(self) -> None:
"""Stops the airflow daemon if necessary and tears down resources."""
if self.is_running:
daemon.stop_daemon(self.pid_file)
fileio.rmtree(self.airflow_home)
logger.info("Airflow spun down.")
prepare_pipeline_deployment(self, pipeline, stack, runtime_configuration)
Checks whether airflow is running and copies the DAG file to the airflow DAGs directory.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If airflow is not running or no DAG filepath runtime option is provided. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Checks whether airflow is running and copies the DAG file to the
airflow DAGs directory.
Raises:
RuntimeError: If airflow is not running or no DAG filepath runtime
option is provided.
"""
if not self.is_running:
raise RuntimeError(
"Airflow orchestrator is currently not running. Run `zenml "
"stack up` to provision resources for the active stack."
)
try:
dag_filepath = runtime_configuration[DAG_FILEPATH_OPTION_KEY]
except KeyError:
raise RuntimeError(
f"No DAG filepath found in runtime configuration. Make sure "
f"to add the filepath to your airflow DAG file as a runtime "
f"option (key: '{DAG_FILEPATH_OPTION_KEY}')."
)
self._copy_to_dag_directory_if_necessary(dag_filepath=dag_filepath)
provision(self)
Ensures that Airflow is running.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def provision(self) -> None:
"""Ensures that Airflow is running."""
if self.is_running:
logger.info("Airflow is already running.")
self._log_webserver_credentials()
return
if not fileio.exists(self.dags_directory):
zenml.io.utils.create_dir_recursive_if_not_exists(
self.dags_directory
)
from airflow.cli.commands.standalone_command import StandaloneCommand
try:
command = StandaloneCommand()
# Run the daemon with a working directory inside the current
# zenml repo so the same repo will be used to run the DAGs
daemon.run_as_daemon(
command.run,
pid_file=self.pid_file,
log_file=self.log_file,
working_directory=str(Repository().root),
)
while not self.is_running:
# Wait until the daemon started all the relevant airflow
# processes
time.sleep(0.1)
self._log_webserver_credentials()
except Exception as e:
logger.error(e)
logger.error(
"An error occurred while starting the Airflow daemon. If you "
"want to start it manually, use the commands described in the "
"official Airflow quickstart guide for running Airflow locally."
)
self.deprovision()
run_pipeline(self, pipeline, stack, runtime_configuration)
Schedules a pipeline to be run on Airflow.
Returns:
Type | Description |
---|---|
Any |
An Airflow DAG object that corresponds to the ZenML pipeline. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def run_pipeline(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Schedules a pipeline to be run on Airflow.
Returns:
An Airflow DAG object that corresponds to the ZenML pipeline.
"""
if runtime_configuration.schedule:
airflow_config = {
"schedule_interval": datetime.timedelta(
seconds=runtime_configuration.schedule.interval_second
),
"start_date": runtime_configuration.schedule.start_time,
"end_date": runtime_configuration.schedule.end_time,
}
else:
airflow_config = {
"schedule_interval": "@once",
# Scheduled in the past to make sure it runs immediately
"start_date": datetime.datetime.now() - datetime.timedelta(7),
}
runner = AirflowDagRunner(AirflowPipelineConfig(airflow_config))
return runner.run(
pipeline=pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
runtime_options(self)
Runtime options for the airflow orchestrator.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def runtime_options(self) -> Dict[str, Any]:
"""Runtime options for the airflow orchestrator."""
return {DAG_FILEPATH_OPTION_KEY: None}
set_airflow_home(values)
classmethod
Sets airflow home according to orchestrator UUID.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
@root_validator
def set_airflow_home(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Sets airflow home according to orchestrator UUID."""
if "uuid" not in values:
raise ValueError("`uuid` needs to exist for AirflowOrchestrator.")
values["airflow_home"] = os.path.join(
zenml.io.utils.get_global_config_directory(),
AIRFLOW_ROOT_DIR,
str(values["uuid"]),
)
return values
aws
special
The AWS integration provides a way for our users to manage their secrets through AWS.
AWSIntegration (Integration)
Definition of AWS integration for ZenML.
Source code in zenml/integrations/aws/__init__.py
class AWSIntegration(Integration):
"""Definition of AWS integration for ZenML."""
NAME = AWS
REQUIREMENTS = ["boto3==1.21.21"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.aws import secret_schemas # noqa
from zenml.integrations.aws import secrets_managers # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/aws/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.aws import secret_schemas # noqa
from zenml.integrations.aws import secrets_managers # noqa
secret_schemas
special
Secret Schema
...
secrets_managers
special
Secret Manager
...
aws_secrets_manager
AWSSecretsManager (BaseSecretsManager)
pydantic-model
Class to interact with the AWS secrets manager.
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
class AWSSecretsManager(BaseSecretsManager):
"""Class to interact with the AWS secrets manager."""
region_name: str = DEFAULT_AWS_REGION
# Class configuration
FLAVOR: ClassVar[str] = "aws"
CLIENT: ClassVar[Any] = None
@classmethod
def _ensure_client_connected(cls, region_name: str) -> None:
if cls.CLIENT is None:
# Create a Secrets Manager client
session = boto3.session.Session()
cls.CLIENT = session.client(
service_name="secretsmanager", region_name=region_name
)
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: the secret to register"""
self._ensure_client_connected(self.region_name)
secret_value = jsonify_secret_contents(secret)
kwargs = {"Name": secret.name, "SecretString": secret_value}
self.CLIENT.create_secret(**kwargs)
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets a secret.
Args:
secret_name: the name of the secret to get
Returns:
The secret.
Raises:
RuntimeError: if the secret does not exist"""
self._ensure_client_connected(self.region_name)
get_secret_value_response = self.CLIENT.get_secret_value(
SecretId=secret_name
)
if "SecretString" not in get_secret_value_response:
raise RuntimeError(f"No secrets found within the {secret_name}")
secret_contents: Dict[str, str] = json.loads(
get_secret_value_response["SecretString"]
)
zenml_schema_name = secret_contents.pop(ZENML_SCHEMA_NAME)
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**secret_contents)
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys."""
self._ensure_client_connected(self.region_name)
# TODO [ENG-720]: Deal with pagination in the aws secret manager when
# listing all secrets
# TODO [ENG-721]: take out this magic maxresults number
response = self.CLIENT.list_secrets(MaxResults=100)
return [secret["Name"] for secret in response["SecretList"]]
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: the secret to update"""
self._ensure_client_connected(self.region_name)
secret_value = jsonify_secret_contents(secret)
kwargs = {"SecretId": secret.name, "SecretString": secret_value}
self.CLIENT.put_secret_value(**kwargs)
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: the name of the secret to delete"""
self._ensure_client_connected(self.region_name)
self.CLIENT.delete_secret(
SecretId=secret_name, ForceDeleteWithoutRecovery=False
)
def delete_all_secrets(self, force: bool = False) -> None:
"""Delete all existing secrets.
Args:
force: whether to force delete all secrets"""
self._ensure_client_connected(self.region_name)
for secret_name in self.get_all_secret_keys():
self.CLIENT.delete_secret(
SecretId=secret_name, ForceDeleteWithoutRecovery=force
)
delete_all_secrets(self, force=False)
Delete all existing secrets.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
whether to force delete all secrets |
False |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def delete_all_secrets(self, force: bool = False) -> None:
"""Delete all existing secrets.
Args:
force: whether to force delete all secrets"""
self._ensure_client_connected(self.region_name)
for secret_name in self.get_all_secret_keys():
self.CLIENT.delete_secret(
SecretId=secret_name, ForceDeleteWithoutRecovery=force
)
delete_secret(self, secret_name)
Delete an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
the name of the secret to delete |
required |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: the name of the secret to delete"""
self._ensure_client_connected(self.region_name)
self.CLIENT.delete_secret(
SecretId=secret_name, ForceDeleteWithoutRecovery=False
)
get_all_secret_keys(self)
Get all secret keys.
Returns:
Type | Description |
---|---|
List[str] |
A list of all secret keys. |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def get_all_secret_keys(self) -> List[str]:
"""Get all secret keys.
Returns:
A list of all secret keys."""
self._ensure_client_connected(self.region_name)
# TODO [ENG-720]: Deal with pagination in the aws secret manager when
# listing all secrets
# TODO [ENG-721]: take out this magic maxresults number
response = self.CLIENT.list_secrets(MaxResults=100)
return [secret["Name"] for secret in response["SecretList"]]
get_secret(self, secret_name)
Gets a secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
the name of the secret to get |
required |
Returns:
Type | Description |
---|---|
BaseSecretSchema |
The secret. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the secret does not exist |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets a secret.
Args:
secret_name: the name of the secret to get
Returns:
The secret.
Raises:
RuntimeError: if the secret does not exist"""
self._ensure_client_connected(self.region_name)
get_secret_value_response = self.CLIENT.get_secret_value(
SecretId=secret_name
)
if "SecretString" not in get_secret_value_response:
raise RuntimeError(f"No secrets found within the {secret_name}")
secret_contents: Dict[str, str] = json.loads(
get_secret_value_response["SecretString"]
)
zenml_schema_name = secret_contents.pop(ZENML_SCHEMA_NAME)
secret_contents["name"] = secret_name
secret_schema = SecretSchemaClassRegistry.get_class(
secret_schema=zenml_schema_name
)
return secret_schema(**secret_contents)
register_secret(self, secret)
Registers a new secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
the secret to register |
required |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: the secret to register"""
self._ensure_client_connected(self.region_name)
secret_value = jsonify_secret_contents(secret)
kwargs = {"Name": secret.name, "SecretString": secret_value}
self.CLIENT.create_secret(**kwargs)
update_secret(self, secret)
Update an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
the secret to update |
required |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> None:
"""Update an existing secret.
Args:
secret: the secret to update"""
self._ensure_client_connected(self.region_name)
secret_value = jsonify_secret_contents(secret)
kwargs = {"SecretId": secret.name, "SecretString": secret_value}
self.CLIENT.put_secret_value(**kwargs)
jsonify_secret_contents(secret)
Adds the secret type to the secret contents to persist the schema type in the secrets backend, so that the correct SecretSchema can be retrieved when the secret is queried from the backend.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
should be a subclass of the BaseSecretSchema class |
required |
Returns:
Type | Description |
---|---|
str |
jsonified dictionary containing all key-value pairs and the ZenML schema type |
Source code in zenml/integrations/aws/secrets_managers/aws_secrets_manager.py
def jsonify_secret_contents(secret: BaseSecretSchema) -> str:
"""Adds the secret type to the secret contents to persist the schema
type in the secrets backend, so that the correct SecretSchema can be
retrieved when the secret is queried from the backend.
Args:
secret: should be a subclass of the BaseSecretSchema class
Returns:
jsonified dictionary containing all key-value pairs and the ZenML schema
type
"""
secret_contents = secret.content
secret_contents[ZENML_SCHEMA_NAME] = secret.TYPE
return json.dumps(secret_contents)
azure
special
The Azure integration submodule provides a way to run ZenML pipelines in a cloud
environment. Specifically, it allows the use of cloud artifact stores,
and an io
module to handle file operations on Azure Blob Storage.
AzureIntegration (Integration)
Definition of Azure integration for ZenML.
Source code in zenml/integrations/azure/__init__.py
class AzureIntegration(Integration):
"""Definition of Azure integration for ZenML."""
NAME = AZURE
REQUIREMENTS = ["adlfs==2021.10.0"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.azure import artifact_stores # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/azure/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.azure import artifact_stores # noqa
artifact_stores
special
azure_artifact_store
AzureArtifactStore (BaseArtifactStore)
pydantic-model
Artifact Store for Microsoft Azure based artifacts.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
class AzureArtifactStore(BaseArtifactStore):
"""Artifact Store for Microsoft Azure based artifacts."""
_filesystem: Optional[adlfs.AzureBlobFileSystem] = None
# Class Configuration
FLAVOR: ClassVar[str] = "azure"
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"abfs://", "az://"}
@property
def filesystem(self) -> adlfs.AzureBlobFileSystem:
"""The adlfs filesystem to access this artifact store."""
if not self._filesystem:
self._filesystem = adlfs.AzureBlobFileSystem(
anon=False,
use_listings_cache=False,
)
return self._filesystem
@classmethod
def _split_path(cls, path: PathType) -> Tuple[str, str]:
"""Splits a path into the filesystem prefix and remainder.
Example:
```python
prefix, remainder = ZenAzure._split_path("az://my_container/test.txt")
print(prefix, remainder) # "az://" "my_container/test.txt"
```
"""
path = convert_to_str(path)
prefix = ""
for potential_prefix in cls.SUPPORTED_SCHEMES:
if path.startswith(potential_prefix):
prefix = potential_prefix
path = path[len(potential_prefix) :]
break
return prefix, path
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
"""
return self.filesystem.open(path=path, mode=mode)
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileNotFoundError: If the source file does not exist.
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
def exists(self, path: PathType) -> bool:
"""Check whether a path exists."""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory."""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory."""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a file info dict returned by the Azure
filesystem."""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path. If needed also
create missing parent directories."""
self.filesystem.makedirs(path=path, exist_ok=True)
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path."""
self.filesystem.makedir(path=path)
def remove(self, path: PathType) -> None:
"""Remove the file at the given path."""
self.filesystem.rm_file(path=path)
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileNotFoundError: If the source file does not exist.
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
def rmtree(self, path: PathType) -> None:
"""Remove the given directory."""
self.filesystem.delete(path=path, recursive=True)
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path."""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
filesystem: AzureBlobFileSystem
property
readonly
The adlfs filesystem to access this artifact store.
copyfile(self, src, dst, overwrite=False)
Copy a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path to copy from. |
required |
dst |
Union[bytes, str] |
The path to copy to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the source file does not exist. |
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileNotFoundError: If the source file does not exist.
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
exists(self, path)
Check whether a path exists.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Check whether a path exists."""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
glob(self, pattern)
Return all paths that match the given glob pattern. The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern to match, see details above. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that match the given glob pattern. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
prefix, _ = self._split_path(pattern)
return [
f"{prefix}{path}" for path in self.filesystem.glob(path=pattern)
]
isdir(self, path)
Check whether a path is a directory.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory."""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
listdir(self, path)
Return a list of files in a directory.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory."""
_, path = self._split_path(path)
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a file info dict returned by the Azure
filesystem."""
file_path = cast(str, file_dict["name"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
]
makedirs(self, path)
Create a directory at the given path. If needed also create missing parent directories.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path. If needed also
create missing parent directories."""
self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)
Create a directory at the given path.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path."""
self.filesystem.makedir(path=path)
open(self, path, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path of the file to open. |
required |
mode |
str |
Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported. |
'r' |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
"""
return self.filesystem.open(path=path, mode=mode)
remove(self, path)
Remove the file at the given path.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path."""
self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the source file does not exist. |
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileNotFoundError: If the source file does not exist.
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)
Remove the given directory.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Remove the given directory."""
self.filesystem.delete(path=path, recursive=True)
stat(self, path)
Return stat info for the given path.
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path."""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Unused argument to conform to interface. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Unused argument to conform to interface. |
None |
Returns:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/integrations/azure/artifact_stores/azure_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
prefix, _ = self._split_path(top)
for (
directory,
subdirectories,
files,
) in self.filesystem.walk(path=top):
yield f"{prefix}{directory}", subdirectories, files
azureml
special
The AzureML integration submodule provides a way to run ZenML steps in AzureML.
AzureMLIntegration (Integration)
Definition of AzureML integration for ZenML.
Source code in zenml/integrations/azureml/__init__.py
class AzureMLIntegration(Integration):
"""Definition of AzureML integration for ZenML."""
NAME = AZUREML
REQUIREMENTS = ["azureml-core==1.39.0.post1"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.azureml import step_operators # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/azureml/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.azureml import step_operators # noqa
step_operators
special
azureml_step_operator
AzureMLStepOperator (BaseStepOperator)
pydantic-model
Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the ZenML entrypoint command in it.
Attributes:
Name | Type | Description |
---|---|---|
subscription_id |
str |
The Azure account's subscription ID |
resource_group |
str |
The resource group to which the AzureML workspace is deployed. |
workspace_name |
str |
The name of the AzureML Workspace. |
compute_target_name |
str |
The name of the configured ComputeTarget. An instance of it has to be created on the portal if it doesn't exist already. |
environment_name |
Optional[str] |
[Optional] The name of the environment if there already exists one. |
docker_base_image |
Optional[str] |
[Optional] The custom docker base image that the environment should use. |
tenant_id |
Optional[str] |
The Azure Tenant ID. |
service_principal_id |
Optional[str] |
The ID for the service principal that is created to allow apps to access secure resources. |
service_principal_password |
Optional[str] |
Password for the service principal. |
Source code in zenml/integrations/azureml/step_operators/azureml_step_operator.py
class AzureMLStepOperator(BaseStepOperator):
"""Step operator to run a step on AzureML.
This class defines code that can set up an AzureML environment and run the
ZenML entrypoint command in it.
Attributes:
subscription_id: The Azure account's subscription ID
resource_group: The resource group to which the AzureML workspace
is deployed.
workspace_name: The name of the AzureML Workspace.
compute_target_name: The name of the configured ComputeTarget.
An instance of it has to be created on the portal if it doesn't
exist already.
environment_name: [Optional] The name of the environment if there
already exists one.
docker_base_image: [Optional] The custom docker base image that the
environment should use.
tenant_id: The Azure Tenant ID.
service_principal_id: The ID for the service principal that is created
to allow apps to access secure resources.
service_principal_password: Password for the service principal.
"""
subscription_id: str
resource_group: str
workspace_name: str
compute_target_name: str
# Environment
environment_name: Optional[str] = None
docker_base_image: Optional[str] = None
# Service principal authentication
# https://docs.microsoft.com/en-us/azure/machine-learning/how-to-setup-authentication#configure-a-service-principal
tenant_id: Optional[str] = None
service_principal_id: Optional[str] = None
service_principal_password: Optional[str] = None
# Class Configuration
FLAVOR: ClassVar[str] = "azureml"
def _get_authentication(self) -> Optional[AbstractAuthentication]:
if (
self.tenant_id
and self.service_principal_id
and self.service_principal_password
):
return ServicePrincipalAuthentication(
tenant_id=self.tenant_id,
service_principal_id=self.service_principal_id,
service_principal_password=self.service_principal_password,
)
return None
def _prepare_environment(
self, workspace: Workspace, requirements: List[str], run_name: str
) -> Environment:
"""Prepares the environment in which Azure will run all jobs.
Args:
workspace: The AzureML Workspace that has configuration
for a storage account, container registry among other
things.
requirements: The list of requirements to be installed
in the environment.
run_name: The name of the pipeline run that can be used
for naming environments and runs.
"""
if self.environment_name:
environment = Environment.get(
workspace=workspace, name=self.environment_name
)
if not environment.python.conda_dependencies:
environment.python.conda_dependencies = (
CondaDependencies.create(
python_version=ZenMLEnvironment.python_version()
)
)
for requirement in requirements:
environment.python.conda_dependencies.add_pip_package(
requirement
)
else:
environment = Environment(name=f"zenml-{run_name}")
environment.python.conda_dependencies = CondaDependencies.create(
pip_packages=requirements,
python_version=ZenMLEnvironment.python_version(),
)
if self.docker_base_image:
# replace the default azure base image
environment.docker.base_image = self.docker_base_image
environment_variables = {
"ENV_ZENML_PREVENT_PIPELINE_EXECUTION": "True",
}
# set credentials to access azure storage
for key in [
"AZURE_STORAGE_ACCOUNT_KEY",
"AZURE_STORAGE_ACCOUNT_NAME",
"AZURE_STORAGE_CONNECTION_STRING",
"AZURE_STORAGE_SAS_TOKEN",
]:
value = os.getenv(key)
if value:
environment_variables[key] = value
environment_variables[
ENV_ZENML_CONFIG_PATH
] = f"./{CONTAINER_ZENML_CONFIG_DIR}"
environment.environment_variables = environment_variables
return environment
def launch(
self,
pipeline_name: str,
run_name: str,
requirements: List[str],
entrypoint_command: List[str],
) -> None:
"""Launches a step on AzureML.
Args:
pipeline_name: Name of the pipeline which the step to be executed
is part of.
run_name: Name of the pipeline run which the step to be executed
is part of.
entrypoint_command: Command that executes the step.
requirements: List of pip requirements that must be installed
inside the step operator environment.
"""
workspace = Workspace.get(
subscription_id=self.subscription_id,
resource_group=self.resource_group,
name=self.workspace_name,
auth=self._get_authentication(),
)
source_directory = get_source_root_path()
config_path = os.path.join(source_directory, CONTAINER_ZENML_CONFIG_DIR)
try:
# Save a copy of the current global configuration with the
# active profile contents into the build context, to have
# the configured stacks accessible from within the Azure ML
# environment.
GlobalConfiguration().copy_config_with_active_profile(
config_path,
load_config_path=f"./{CONTAINER_ZENML_CONFIG_DIR}",
)
environment = self._prepare_environment(
workspace=workspace,
requirements=requirements,
run_name=run_name,
)
compute_target = ComputeTarget(
workspace=workspace, name=self.compute_target_name
)
run_config = ScriptRunConfig(
source_directory=source_directory,
environment=environment,
compute_target=compute_target,
command=entrypoint_command,
)
experiment = Experiment(workspace=workspace, name=pipeline_name)
run = experiment.submit(config=run_config)
finally:
# Clean up the temporary build files
fileio.rmtree(config_path)
run.display_name = run_name
run.wait_for_completion(show_output=True)
launch(self, pipeline_name, run_name, requirements, entrypoint_command)
Launches a step on AzureML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
Name of the pipeline which the step to be executed is part of. |
required |
run_name |
str |
Name of the pipeline run which the step to be executed is part of. |
required |
entrypoint_command |
List[str] |
Command that executes the step. |
required |
requirements |
List[str] |
List of pip requirements that must be installed inside the step operator environment. |
required |
Source code in zenml/integrations/azureml/step_operators/azureml_step_operator.py
def launch(
self,
pipeline_name: str,
run_name: str,
requirements: List[str],
entrypoint_command: List[str],
) -> None:
"""Launches a step on AzureML.
Args:
pipeline_name: Name of the pipeline which the step to be executed
is part of.
run_name: Name of the pipeline run which the step to be executed
is part of.
entrypoint_command: Command that executes the step.
requirements: List of pip requirements that must be installed
inside the step operator environment.
"""
workspace = Workspace.get(
subscription_id=self.subscription_id,
resource_group=self.resource_group,
name=self.workspace_name,
auth=self._get_authentication(),
)
source_directory = get_source_root_path()
config_path = os.path.join(source_directory, CONTAINER_ZENML_CONFIG_DIR)
try:
# Save a copy of the current global configuration with the
# active profile contents into the build context, to have
# the configured stacks accessible from within the Azure ML
# environment.
GlobalConfiguration().copy_config_with_active_profile(
config_path,
load_config_path=f"./{CONTAINER_ZENML_CONFIG_DIR}",
)
environment = self._prepare_environment(
workspace=workspace,
requirements=requirements,
run_name=run_name,
)
compute_target = ComputeTarget(
workspace=workspace, name=self.compute_target_name
)
run_config = ScriptRunConfig(
source_directory=source_directory,
environment=environment,
compute_target=compute_target,
command=entrypoint_command,
)
experiment = Experiment(workspace=workspace, name=pipeline_name)
run = experiment.submit(config=run_config)
finally:
# Clean up the temporary build files
fileio.rmtree(config_path)
run.display_name = run_name
run.wait_for_completion(show_output=True)
dash
special
DashIntegration (Integration)
Definition of Dash integration for ZenML.
Source code in zenml/integrations/dash/__init__.py
class DashIntegration(Integration):
"""Definition of Dash integration for ZenML."""
NAME = DASH
REQUIREMENTS = [
"dash>=2.0.0",
"dash-cytoscape>=0.3.0",
"dash-bootstrap-components>=1.0.1",
]
visualizers
special
pipeline_run_lineage_visualizer
PipelineRunLineageVisualizer (BasePipelineRunVisualizer)
Implementation of a lineage diagram via the dash and dash-cyctoscape library.
Source code in zenml/integrations/dash/visualizers/pipeline_run_lineage_visualizer.py
class PipelineRunLineageVisualizer(BasePipelineRunVisualizer):
"""Implementation of a lineage diagram via the [dash](
https://plotly.com/dash/) and [dash-cyctoscape](
https://dash.plotly.com/cytoscape) library."""
ARTIFACT_PREFIX = "artifact_"
STEP_PREFIX = "step_"
STATUS_CLASS_MAPPING = {
ExecutionStatus.CACHED: "green",
ExecutionStatus.FAILED: "red",
ExecutionStatus.RUNNING: "yellow",
ExecutionStatus.COMPLETED: "blue",
}
def visualize(
self, object: PipelineRunView, *args: Any, **kwargs: Any
) -> dash.Dash:
"""Method to visualize pipeline runs via the Dash library. The layout
puts every layer of the dag in a column.
"""
app = dash.Dash(
__name__,
external_stylesheets=[
dbc.themes.BOOTSTRAP,
dbc.icons.BOOTSTRAP,
],
)
nodes, edges, first_step_id = [], [], None
first_step_id = None
for step in object.steps:
step_output_artifacts = list(step.outputs.values())
execution_id = (
step_output_artifacts[0].producer_step.id
if step_output_artifacts
else step.id
)
step_id = self.STEP_PREFIX + str(step.id)
if first_step_id is None:
first_step_id = step_id
nodes.append(
{
"data": {
"id": step_id,
"execution_id": execution_id,
"label": f"{execution_id} / {step.entrypoint_name}",
"entrypoint_name": step.entrypoint_name, # redundant for consistency
"name": step.name, # redundant for consistency
"type": "step",
"parameters": step.parameters,
"inputs": {k: v.uri for k, v in step.inputs.items()},
"outputs": {k: v.uri for k, v in step.outputs.items()},
},
"classes": self.STATUS_CLASS_MAPPING[step.status],
}
)
for artifact_name, artifact in step.outputs.items():
nodes.append(
{
"data": {
"id": self.ARTIFACT_PREFIX + str(artifact.id),
"execution_id": artifact.id,
"label": f"{artifact.id} / {artifact_name} ("
f"{artifact.data_type})",
"type": "artifact",
"name": artifact_name,
"is_cached": artifact.is_cached,
"artifact_type": artifact.type,
"artifact_data_type": artifact.data_type,
"parent_step_id": artifact.parent_step_id,
"producer_step_id": artifact.producer_step.id,
"uri": artifact.uri,
},
"classes": f"rectangle "
f"{self.STATUS_CLASS_MAPPING[step.status]}",
}
)
edges.append(
{
"data": {
"source": self.STEP_PREFIX + str(step.id),
"target": self.ARTIFACT_PREFIX + str(artifact.id),
},
"classes": f"edge-arrow "
f"{self.STATUS_CLASS_MAPPING[step.status]}"
+ (" dashed" if artifact.is_cached else " solid"),
}
)
for artifact_name, artifact in step.inputs.items():
edges.append(
{
"data": {
"source": self.ARTIFACT_PREFIX + str(artifact.id),
"target": self.STEP_PREFIX + str(step.id),
},
"classes": "edge-arrow "
+ (
f"{self.STATUS_CLASS_MAPPING[ExecutionStatus.CACHED]} dashed"
if artifact.is_cached
else f"{self.STATUS_CLASS_MAPPING[step.status]} solid"
),
}
)
app.layout = dbc.Row(
[
dbc.Container(f"Run: {object.name}", class_name="h1"),
dbc.Row(
[
dbc.Col(
[
dbc.Row(
[
html.Span(
[
html.Span(
[
html.I(
className="bi bi-circle-fill me-1"
),
"Step",
],
className="me-2",
),
html.Span(
[
html.I(
className="bi bi-square-fill me-1"
),
"Artifact",
],
className="me-4",
),
dbc.Badge(
"Completed",
color=COLOR_BLUE,
className="me-1",
),
dbc.Badge(
"Cached",
color=COLOR_GREEN,
className="me-1",
),
dbc.Badge(
"Running",
color=COLOR_YELLOW,
className="me-1",
),
dbc.Badge(
"Failed",
color=COLOR_RED,
className="me-1",
),
]
),
]
),
dbc.Row(
[
cyto.Cytoscape(
id="cytoscape",
layout={
"name": "breadthfirst",
"roots": f'[id = "{first_step_id}"]',
},
elements=edges + nodes,
stylesheet=STYLESHEET,
style={
"width": "100%",
"height": "800px",
},
zoom=1,
)
]
),
dbc.Row(
[
dbc.Button(
"Reset",
id="bt-reset",
color="primary",
className="me-1",
)
]
),
]
),
dbc.Col(
[
dcc.Markdown(id="markdown-selected-node-data"),
]
),
]
),
],
className="p-5",
)
@app.callback( # type: ignore[misc]
Output("markdown-selected-node-data", "children"),
Input("cytoscape", "selectedNodeData"),
)
def display_data(data_list: List[Dict[str, Any]]) -> str:
"""Callback for the text area below the graph"""
if data_list is None:
return "Click on a node in the diagram."
text = ""
for data in data_list:
text += f'## {data["execution_id"]} / {data["name"]}' + "\n\n"
if data["type"] == "artifact":
for item in [
"artifact_data_type",
"is_cached",
"producer_step_id",
"parent_step_id",
"uri",
]:
text += f"**{item}**: {data[item]}" + "\n\n"
elif data["type"] == "step":
text += "### Inputs:" + "\n\n"
for k, v in data["inputs"].items():
text += f"**{k}**: {v}" + "\n\n"
text += "### Outputs:" + "\n\n"
for k, v in data["outputs"].items():
text += f"**{k}**: {v}" + "\n\n"
text += "### Params:"
for k, v in data["parameters"].items():
text += f"**{k}**: {v}" + "\n\n"
return text
@app.callback( # type: ignore[misc]
[Output("cytoscape", "zoom"), Output("cytoscape", "elements")],
[Input("bt-reset", "n_clicks")],
)
def reset_layout(
n_clicks: int,
) -> List[Union[int, List[Dict[str, Collection[str]]]]]:
"""Resets the layout"""
logger.debug(n_clicks, "clicked in reset button.")
return [1, edges + nodes]
app.run_server()
return app
visualize(self, object, *args, **kwargs)
Method to visualize pipeline runs via the Dash library. The layout puts every layer of the dag in a column.
Source code in zenml/integrations/dash/visualizers/pipeline_run_lineage_visualizer.py
def visualize(
self, object: PipelineRunView, *args: Any, **kwargs: Any
) -> dash.Dash:
"""Method to visualize pipeline runs via the Dash library. The layout
puts every layer of the dag in a column.
"""
app = dash.Dash(
__name__,
external_stylesheets=[
dbc.themes.BOOTSTRAP,
dbc.icons.BOOTSTRAP,
],
)
nodes, edges, first_step_id = [], [], None
first_step_id = None
for step in object.steps:
step_output_artifacts = list(step.outputs.values())
execution_id = (
step_output_artifacts[0].producer_step.id
if step_output_artifacts
else step.id
)
step_id = self.STEP_PREFIX + str(step.id)
if first_step_id is None:
first_step_id = step_id
nodes.append(
{
"data": {
"id": step_id,
"execution_id": execution_id,
"label": f"{execution_id} / {step.entrypoint_name}",
"entrypoint_name": step.entrypoint_name, # redundant for consistency
"name": step.name, # redundant for consistency
"type": "step",
"parameters": step.parameters,
"inputs": {k: v.uri for k, v in step.inputs.items()},
"outputs": {k: v.uri for k, v in step.outputs.items()},
},
"classes": self.STATUS_CLASS_MAPPING[step.status],
}
)
for artifact_name, artifact in step.outputs.items():
nodes.append(
{
"data": {
"id": self.ARTIFACT_PREFIX + str(artifact.id),
"execution_id": artifact.id,
"label": f"{artifact.id} / {artifact_name} ("
f"{artifact.data_type})",
"type": "artifact",
"name": artifact_name,
"is_cached": artifact.is_cached,
"artifact_type": artifact.type,
"artifact_data_type": artifact.data_type,
"parent_step_id": artifact.parent_step_id,
"producer_step_id": artifact.producer_step.id,
"uri": artifact.uri,
},
"classes": f"rectangle "
f"{self.STATUS_CLASS_MAPPING[step.status]}",
}
)
edges.append(
{
"data": {
"source": self.STEP_PREFIX + str(step.id),
"target": self.ARTIFACT_PREFIX + str(artifact.id),
},
"classes": f"edge-arrow "
f"{self.STATUS_CLASS_MAPPING[step.status]}"
+ (" dashed" if artifact.is_cached else " solid"),
}
)
for artifact_name, artifact in step.inputs.items():
edges.append(
{
"data": {
"source": self.ARTIFACT_PREFIX + str(artifact.id),
"target": self.STEP_PREFIX + str(step.id),
},
"classes": "edge-arrow "
+ (
f"{self.STATUS_CLASS_MAPPING[ExecutionStatus.CACHED]} dashed"
if artifact.is_cached
else f"{self.STATUS_CLASS_MAPPING[step.status]} solid"
),
}
)
app.layout = dbc.Row(
[
dbc.Container(f"Run: {object.name}", class_name="h1"),
dbc.Row(
[
dbc.Col(
[
dbc.Row(
[
html.Span(
[
html.Span(
[
html.I(
className="bi bi-circle-fill me-1"
),
"Step",
],
className="me-2",
),
html.Span(
[
html.I(
className="bi bi-square-fill me-1"
),
"Artifact",
],
className="me-4",
),
dbc.Badge(
"Completed",
color=COLOR_BLUE,
className="me-1",
),
dbc.Badge(
"Cached",
color=COLOR_GREEN,
className="me-1",
),
dbc.Badge(
"Running",
color=COLOR_YELLOW,
className="me-1",
),
dbc.Badge(
"Failed",
color=COLOR_RED,
className="me-1",
),
]
),
]
),
dbc.Row(
[
cyto.Cytoscape(
id="cytoscape",
layout={
"name": "breadthfirst",
"roots": f'[id = "{first_step_id}"]',
},
elements=edges + nodes,
stylesheet=STYLESHEET,
style={
"width": "100%",
"height": "800px",
},
zoom=1,
)
]
),
dbc.Row(
[
dbc.Button(
"Reset",
id="bt-reset",
color="primary",
className="me-1",
)
]
),
]
),
dbc.Col(
[
dcc.Markdown(id="markdown-selected-node-data"),
]
),
]
),
],
className="p-5",
)
@app.callback( # type: ignore[misc]
Output("markdown-selected-node-data", "children"),
Input("cytoscape", "selectedNodeData"),
)
def display_data(data_list: List[Dict[str, Any]]) -> str:
"""Callback for the text area below the graph"""
if data_list is None:
return "Click on a node in the diagram."
text = ""
for data in data_list:
text += f'## {data["execution_id"]} / {data["name"]}' + "\n\n"
if data["type"] == "artifact":
for item in [
"artifact_data_type",
"is_cached",
"producer_step_id",
"parent_step_id",
"uri",
]:
text += f"**{item}**: {data[item]}" + "\n\n"
elif data["type"] == "step":
text += "### Inputs:" + "\n\n"
for k, v in data["inputs"].items():
text += f"**{k}**: {v}" + "\n\n"
text += "### Outputs:" + "\n\n"
for k, v in data["outputs"].items():
text += f"**{k}**: {v}" + "\n\n"
text += "### Params:"
for k, v in data["parameters"].items():
text += f"**{k}**: {v}" + "\n\n"
return text
@app.callback( # type: ignore[misc]
[Output("cytoscape", "zoom"), Output("cytoscape", "elements")],
[Input("bt-reset", "n_clicks")],
)
def reset_layout(
n_clicks: int,
) -> List[Union[int, List[Dict[str, Collection[str]]]]]:
"""Resets the layout"""
logger.debug(n_clicks, "clicked in reset button.")
return [1, edges + nodes]
app.run_server()
return app
evidently
special
The Evidently integration provides a way to monitor your models in production. It includes a way to detect data drift and different kinds of model performance issues.
The results of Evidently calculations can either be exported as an interactive dashboard (visualized as an html file or in your Jupyter notebook), or as a JSON file.
EvidentlyIntegration (Integration)
Definition of Evidently integration for ZenML.
Source code in zenml/integrations/evidently/__init__.py
class EvidentlyIntegration(Integration):
"""Definition of [Evidently](https://github.com/evidentlyai/evidently) integration
for ZenML."""
NAME = EVIDENTLY
REQUIREMENTS = ["evidently==v0.1.41.dev0"]
steps
special
evidently_profile
EvidentlyProfileConfig (BaseDriftDetectionConfig)
pydantic-model
Config class for Evidently profile steps.
column_mapping: properties of the dataframe's columns used !!! profile_section "a string that identifies the profile section to be used." The following are valid options supported by Evidently: - "datadrift" - "categoricaltargetdrift" - "numericaltargetdrift" - "classificationmodelperformance" - "regressionmodelperformance" - "probabilisticmodelperformance"
Source code in zenml/integrations/evidently/steps/evidently_profile.py
class EvidentlyProfileConfig(BaseDriftDetectionConfig):
"""Config class for Evidently profile steps.
column_mapping: properties of the dataframe's columns used
profile_section: a string that identifies the profile section to be used.
The following are valid options supported by Evidently:
- "datadrift"
- "categoricaltargetdrift"
- "numericaltargetdrift"
- "classificationmodelperformance"
- "regressionmodelperformance"
- "probabilisticmodelperformance"
"""
def get_profile_sections_and_tabs(
self,
) -> Tuple[List[ProfileSection], List[Tab]]:
try:
return (
[
profile_mapper[profile]()
for profile in self.profile_sections
],
[
dashboard_mapper[profile]()
for profile in self.profile_sections
],
)
except KeyError:
nl = "\n"
raise ValueError(
f"Invalid profile section: {self.profile_sections} \n\n"
f"Valid and supported options are: {nl}- "
f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
)
column_mapping: Optional[ColumnMapping]
profile_sections: Sequence[str]
EvidentlyProfileStep (BaseDriftDetectionStep)
Simple step implementation which implements Evidently's functionality for creating a profile.
Source code in zenml/integrations/evidently/steps/evidently_profile.py
class EvidentlyProfileStep(BaseDriftDetectionStep):
"""Simple step implementation which implements Evidently's functionality for
creating a profile."""
OUTPUT_SPEC = {
"profile": DataAnalysisArtifact,
"dashboard": DataAnalysisArtifact,
}
def entrypoint( # type: ignore[override]
self,
reference_dataset: pd.DataFrame,
comparison_dataset: pd.DataFrame,
config: EvidentlyProfileConfig,
) -> Output( # type:ignore[valid-type]
profile=dict, dashboard=str
):
"""Main entrypoint for the Evidently categorical target drift detection
step.
Args:
reference_dataset: a Pandas dataframe
comparison_dataset: a Pandas dataframe of new data you wish to
compare against the reference data
config: the configuration for the step
Returns:
profile: dictionary report extracted from an Evidently Profile
generated for the data drift
dashboard: HTML report extracted from an Evidently Dashboard
generated for the data drift
"""
sections, tabs = config.get_profile_sections_and_tabs()
data_drift_dashboard = Dashboard(tabs=tabs)
data_drift_dashboard.calculate(
reference_dataset,
comparison_dataset,
column_mapping=config.column_mapping or None,
)
data_drift_profile = Profile(sections=sections)
data_drift_profile.calculate(
reference_dataset,
comparison_dataset,
column_mapping=config.column_mapping or None,
)
return [data_drift_profile.object(), data_drift_dashboard.html()]
CONFIG_CLASS (BaseDriftDetectionConfig)
pydantic-model
Config class for Evidently profile steps.
column_mapping: properties of the dataframe's columns used !!! profile_section "a string that identifies the profile section to be used." The following are valid options supported by Evidently: - "datadrift" - "categoricaltargetdrift" - "numericaltargetdrift" - "classificationmodelperformance" - "regressionmodelperformance" - "probabilisticmodelperformance"
Source code in zenml/integrations/evidently/steps/evidently_profile.py
class EvidentlyProfileConfig(BaseDriftDetectionConfig):
"""Config class for Evidently profile steps.
column_mapping: properties of the dataframe's columns used
profile_section: a string that identifies the profile section to be used.
The following are valid options supported by Evidently:
- "datadrift"
- "categoricaltargetdrift"
- "numericaltargetdrift"
- "classificationmodelperformance"
- "regressionmodelperformance"
- "probabilisticmodelperformance"
"""
def get_profile_sections_and_tabs(
self,
) -> Tuple[List[ProfileSection], List[Tab]]:
try:
return (
[
profile_mapper[profile]()
for profile in self.profile_sections
],
[
dashboard_mapper[profile]()
for profile in self.profile_sections
],
)
except KeyError:
nl = "\n"
raise ValueError(
f"Invalid profile section: {self.profile_sections} \n\n"
f"Valid and supported options are: {nl}- "
f'{f"{nl}- ".join(list(profile_mapper.keys()))}'
)
column_mapping: Optional[ColumnMapping]
profile_sections: Sequence[str]
entrypoint(self, reference_dataset, comparison_dataset, config)
Main entrypoint for the Evidently categorical target drift detection step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reference_dataset |
DataFrame |
a Pandas dataframe |
required |
comparison_dataset |
DataFrame |
a Pandas dataframe of new data you wish to compare against the reference data |
required |
config |
EvidentlyProfileConfig |
the configuration for the step |
required |
Returns:
Type | Description |
---|---|
profile |
dictionary report extracted from an Evidently Profile generated for the data drift dashboard: HTML report extracted from an Evidently Dashboard generated for the data drift |
Source code in zenml/integrations/evidently/steps/evidently_profile.py
def entrypoint( # type: ignore[override]
self,
reference_dataset: pd.DataFrame,
comparison_dataset: pd.DataFrame,
config: EvidentlyProfileConfig,
) -> Output( # type:ignore[valid-type]
profile=dict, dashboard=str
):
"""Main entrypoint for the Evidently categorical target drift detection
step.
Args:
reference_dataset: a Pandas dataframe
comparison_dataset: a Pandas dataframe of new data you wish to
compare against the reference data
config: the configuration for the step
Returns:
profile: dictionary report extracted from an Evidently Profile
generated for the data drift
dashboard: HTML report extracted from an Evidently Dashboard
generated for the data drift
"""
sections, tabs = config.get_profile_sections_and_tabs()
data_drift_dashboard = Dashboard(tabs=tabs)
data_drift_dashboard.calculate(
reference_dataset,
comparison_dataset,
column_mapping=config.column_mapping or None,
)
data_drift_profile = Profile(sections=sections)
data_drift_profile.calculate(
reference_dataset,
comparison_dataset,
column_mapping=config.column_mapping or None,
)
return [data_drift_profile.object(), data_drift_dashboard.html()]
visualizers
special
evidently_visualizer
EvidentlyVisualizer (BaseStepVisualizer)
The implementation of an Evidently Visualizer.
Source code in zenml/integrations/evidently/visualizers/evidently_visualizer.py
class EvidentlyVisualizer(BaseStepVisualizer):
"""The implementation of an Evidently Visualizer."""
@abstractmethod
def visualize(self, object: StepView, *args: Any, **kwargs: Any) -> None:
"""Method to visualize components
Args:
object: StepView fetched from run.get_step().
"""
for artifact_view in object.outputs.values():
# filter out anything but data analysis artifacts
if (
artifact_view.type == DataAnalysisArtifact.__name__
and artifact_view.data_type == "builtins.str"
):
artifact = artifact_view.read()
self.generate_facet(artifact)
def generate_facet(self, html_: str) -> None:
"""Generate a Facet Overview
Args:
html_: HTML represented as a string.
"""
if Environment.in_notebook():
from IPython.core.display import HTML, display
display(HTML(html_))
else:
logger.warning(
"The magic functions are only usable in a Jupyter notebook."
)
with tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".html", encoding="utf-8"
) as f:
f.write(html_)
url = f"file:///{f.name}"
logger.info("Opening %s in a new browser.." % f.name)
webbrowser.open(url, new=2)
generate_facet(self, html_)
Generate a Facet Overview
Parameters:
Name | Type | Description | Default |
---|---|---|---|
html_ |
str |
HTML represented as a string. |
required |
Source code in zenml/integrations/evidently/visualizers/evidently_visualizer.py
def generate_facet(self, html_: str) -> None:
"""Generate a Facet Overview
Args:
html_: HTML represented as a string.
"""
if Environment.in_notebook():
from IPython.core.display import HTML, display
display(HTML(html_))
else:
logger.warning(
"The magic functions are only usable in a Jupyter notebook."
)
with tempfile.NamedTemporaryFile(
mode="w", delete=False, suffix=".html", encoding="utf-8"
) as f:
f.write(html_)
url = f"file:///{f.name}"
logger.info("Opening %s in a new browser.." % f.name)
webbrowser.open(url, new=2)
visualize(self, object, *args, **kwargs)
Method to visualize components
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
StepView |
StepView fetched from run.get_step(). |
required |
Source code in zenml/integrations/evidently/visualizers/evidently_visualizer.py
@abstractmethod
def visualize(self, object: StepView, *args: Any, **kwargs: Any) -> None:
"""Method to visualize components
Args:
object: StepView fetched from run.get_step().
"""
for artifact_view in object.outputs.values():
# filter out anything but data analysis artifacts
if (
artifact_view.type == DataAnalysisArtifact.__name__
and artifact_view.data_type == "builtins.str"
):
artifact = artifact_view.read()
self.generate_facet(artifact)
facets
special
The Facets integration provides a simple way to visualize post-execution objects
like PipelineView
, PipelineRunView
and StepView
. These objects can be
extended using the BaseVisualization
class. This integration requires
facets-overview
be installed in your Python environment.
FacetsIntegration (Integration)
Definition of Facet integration for ZenML.
Source code in zenml/integrations/facets/__init__.py
class FacetsIntegration(Integration):
"""Definition of [Facet](https://pair-code.github.io/facets/) integration
for ZenML."""
NAME = FACETS
REQUIREMENTS = ["facets-overview>=1.0.0", "IPython"]
visualizers
special
facet_statistics_visualizer
FacetStatisticsVisualizer (BaseStepVisualizer)
The base implementation of a ZenML Visualizer.
Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
class FacetStatisticsVisualizer(BaseStepVisualizer):
"""The base implementation of a ZenML Visualizer."""
@abstractmethod
def visualize(
self, object: StepView, magic: bool = False, *args: Any, **kwargs: Any
) -> None:
"""Method to visualize components
Args:
object: StepView fetched from run.get_step().
magic: Whether to render in a Jupyter notebook or not.
"""
datasets = []
for output_name, artifact_view in object.outputs.items():
df = artifact_view.read()
if type(df) is not pd.DataFrame:
logger.warning(
"`%s` is not a pd.DataFrame. You can only visualize "
"statistics of steps that output pandas dataframes. "
"Skipping this output.." % output_name
)
else:
datasets.append({"name": output_name, "table": df})
h = self.generate_html(datasets)
self.generate_facet(h, magic)
def generate_html(self, datasets: List[Dict[Text, pd.DataFrame]]) -> str:
"""Generates html for facet.
Args:
datasets: List of dicts of dataframes to be visualized as stats.
Returns:
HTML template with proto string embedded.
"""
proto = GenericFeatureStatisticsGenerator().ProtoFromDataFrames(
datasets
)
protostr = base64.b64encode(proto.SerializeToString()).decode("utf-8")
template = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"stats.html",
)
html_template = zenml.io.utils.read_file_contents_as_string(template)
html_ = html_template.replace("protostr", protostr)
return html_
def generate_facet(self, html_: str, magic: bool = False) -> None:
"""Generate a Facet Overview
Args:
html_: HTML represented as a string.
magic: Whether to magically materialize facet in a notebook.
"""
if magic:
if not Environment.in_notebook():
raise EnvironmentError(
"The magic functions are only usable in a Jupyter notebook."
)
display(HTML(html_))
else:
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
zenml.io.utils.write_file_contents_as_string(f.name, html_)
url = f"file:///{f.name}"
logger.info("Opening %s in a new browser.." % f.name)
webbrowser.open(url, new=2)
generate_facet(self, html_, magic=False)
Generate a Facet Overview
Parameters:
Name | Type | Description | Default |
---|---|---|---|
html_ |
str |
HTML represented as a string. |
required |
magic |
bool |
Whether to magically materialize facet in a notebook. |
False |
Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
def generate_facet(self, html_: str, magic: bool = False) -> None:
"""Generate a Facet Overview
Args:
html_: HTML represented as a string.
magic: Whether to magically materialize facet in a notebook.
"""
if magic:
if not Environment.in_notebook():
raise EnvironmentError(
"The magic functions are only usable in a Jupyter notebook."
)
display(HTML(html_))
else:
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
zenml.io.utils.write_file_contents_as_string(f.name, html_)
url = f"file:///{f.name}"
logger.info("Opening %s in a new browser.." % f.name)
webbrowser.open(url, new=2)
generate_html(self, datasets)
Generates html for facet.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
datasets |
List[Dict[str, pandas.core.frame.DataFrame]] |
List of dicts of dataframes to be visualized as stats. |
required |
Returns:
Type | Description |
---|---|
str |
HTML template with proto string embedded. |
Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
def generate_html(self, datasets: List[Dict[Text, pd.DataFrame]]) -> str:
"""Generates html for facet.
Args:
datasets: List of dicts of dataframes to be visualized as stats.
Returns:
HTML template with proto string embedded.
"""
proto = GenericFeatureStatisticsGenerator().ProtoFromDataFrames(
datasets
)
protostr = base64.b64encode(proto.SerializeToString()).decode("utf-8")
template = os.path.join(
os.path.abspath(os.path.dirname(__file__)),
"stats.html",
)
html_template = zenml.io.utils.read_file_contents_as_string(template)
html_ = html_template.replace("protostr", protostr)
return html_
visualize(self, object, magic=False, *args, **kwargs)
Method to visualize components
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
StepView |
StepView fetched from run.get_step(). |
required |
magic |
bool |
Whether to render in a Jupyter notebook or not. |
False |
Source code in zenml/integrations/facets/visualizers/facet_statistics_visualizer.py
@abstractmethod
def visualize(
self, object: StepView, magic: bool = False, *args: Any, **kwargs: Any
) -> None:
"""Method to visualize components
Args:
object: StepView fetched from run.get_step().
magic: Whether to render in a Jupyter notebook or not.
"""
datasets = []
for output_name, artifact_view in object.outputs.items():
df = artifact_view.read()
if type(df) is not pd.DataFrame:
logger.warning(
"`%s` is not a pd.DataFrame. You can only visualize "
"statistics of steps that output pandas dataframes. "
"Skipping this output.." % output_name
)
else:
datasets.append({"name": output_name, "table": df})
h = self.generate_html(datasets)
self.generate_facet(h, magic)
feast
special
The Feast integration offers a way to connect to a Feast Feature Store. ZenML implements a dedicated stack component that you can access as part of your ZenML steps in the usual ways.
FeastIntegration (Integration)
Definition of Feast integration for ZenML.
Source code in zenml/integrations/feast/__init__.py
class FeastIntegration(Integration):
"""Definition of Feast integration for ZenML."""
NAME = FEAST
REQUIREMENTS = ["feast[redis]>=0.19.4", "redis-server"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.feast import feature_stores # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/feast/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.feast import feature_stores # noqa
feature_stores
special
Feast Feature Store
Feature stores allow data teams to serve data via an offline store and an online low-latency store where data is kept in sync between the two. It also offers a centralized registry where features (and feature schemas) are stored for use within a team or wider organization. Feature stores are a relatively recent addition to commonly-used machine learning stacks. Feast is a leading open-source feature store, first developed by Gojek in collaboration with Google.
feast_feature_store
FeastFeatureStore (BaseFeatureStore)
pydantic-model
Class to interact with the Feast feature store.
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
class FeastFeatureStore(BaseFeatureStore):
"""Class to interact with the Feast feature store."""
FLAVOR: ClassVar[str] = "feast"
online_host: str = "localhost"
online_port: int = 6379
feast_repo: str
def _validate_connection(self) -> None:
"""Validates the connection to the feature store.
Raises:
RuntimeError: If the online component (Redis) is not available.
"""
client = redis.Redis(host=self.online_host, port=self.online_port)
try:
client.ping()
except redis.exceptions.ConnectionError as e:
raise redis.exceptions.ConnectionError(
"Could not connect to feature store's online component. "
"Please make sure that Redis is running."
) from e
def get_historical_features(
self,
entity_df: Union[pd.DataFrame, str],
features: List[str],
full_feature_names: bool = False,
) -> pd.DataFrame:
"""Returns the historical features for training or batch scoring.
Args:
entity_df: The entity dataframe or entity name.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The historical features as a Pandas DataFrame.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return fs.get_historical_features(
entity_df=entity_df,
features=features,
full_feature_names=full_feature_names,
).to_df()
def get_online_features(
self,
entity_rows: List[Dict[str, Any]],
features: List[str],
full_feature_names: bool = False,
) -> Dict[str, Any]:
"""Returns the latest online feature data.
Args:
entity_rows: The entity rows to retrieve.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The latest online feature data as a dictionary.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return fs.get_online_features( # type: ignore[no-any-return]
entity_rows=entity_rows,
features=features,
full_feature_names=full_feature_names,
).to_dict()
def get_data_sources(self) -> List[str]:
"""Returns the data sources' names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The data sources' names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_data_sources()]
def get_entities(self) -> List[str]:
"""Returns the entity names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The entity names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_entities()]
def get_feature_services(self) -> List[str]:
"""Returns the feature service names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature service names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_feature_services()]
def get_feature_views(self) -> List[str]:
"""Returns the feature view names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature view names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_feature_views()]
def get_project(self) -> str:
"""Returns the project name.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The project name.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return str(fs.project)
def get_registry(self) -> Registry:
"""Returns the feature store registry.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The registry.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return fs.registry
def get_feast_version(self) -> str:
"""Returns the version of Feast used.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The version of Feast currently being used.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return str(fs.version())
get_data_sources(self)
Returns the data sources' names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The data sources' names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_data_sources(self) -> List[str]:
"""Returns the data sources' names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The data sources' names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_data_sources()]
get_entities(self)
Returns the entity names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The entity names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_entities(self) -> List[str]:
"""Returns the entity names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The entity names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_entities()]
get_feast_version(self)
Returns the version of Feast used.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
str |
The version of Feast currently being used. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feast_version(self) -> str:
"""Returns the version of Feast used.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The version of Feast currently being used.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return str(fs.version())
get_feature_services(self)
Returns the feature service names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The feature service names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_services(self) -> List[str]:
"""Returns the feature service names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature service names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_feature_services()]
get_feature_views(self)
Returns the feature view names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The feature view names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_views(self) -> List[str]:
"""Returns the feature view names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature view names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return [ds.name for ds in fs.list_feature_views()]
get_historical_features(self, entity_df, features, full_feature_names=False)
Returns the historical features for training or batch scoring.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_df |
Union[pandas.core.frame.DataFrame, str] |
The entity dataframe or entity name. |
required |
features |
List[str] |
The features to retrieve. |
required |
full_feature_names |
bool |
Whether to return the full feature names. |
False |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
DataFrame |
The historical features as a Pandas DataFrame. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_historical_features(
self,
entity_df: Union[pd.DataFrame, str],
features: List[str],
full_feature_names: bool = False,
) -> pd.DataFrame:
"""Returns the historical features for training or batch scoring.
Args:
entity_df: The entity dataframe or entity name.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The historical features as a Pandas DataFrame.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return fs.get_historical_features(
entity_df=entity_df,
features=features,
full_feature_names=full_feature_names,
).to_df()
get_online_features(self, entity_rows, features, full_feature_names=False)
Returns the latest online feature data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_rows |
List[Dict[str, Any]] |
The entity rows to retrieve. |
required |
features |
List[str] |
The features to retrieve. |
required |
full_feature_names |
bool |
Whether to return the full feature names. |
False |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The latest online feature data as a dictionary. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_online_features(
self,
entity_rows: List[Dict[str, Any]],
features: List[str],
full_feature_names: bool = False,
) -> Dict[str, Any]:
"""Returns the latest online feature data.
Args:
entity_rows: The entity rows to retrieve.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The latest online feature data as a dictionary.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.feast_repo)
return fs.get_online_features( # type: ignore[no-any-return]
entity_rows=entity_rows,
features=features,
full_feature_names=full_feature_names,
).to_dict()
get_project(self)
Returns the project name.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
str |
The project name. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_project(self) -> str:
"""Returns the project name.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The project name.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return str(fs.project)
get_registry(self)
Returns the feature store registry.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
Registry |
The registry. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_registry(self) -> Registry:
"""Returns the feature store registry.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The registry.
"""
fs = FeatureStore(repo_path=self.feast_repo)
return fs.registry
gcp
special
The GCP integration submodule provides a way to run ZenML pipelines in a cloud
environment. Specifically, it allows the use of cloud artifact stores, metadata
stores, and an io
module to handle file operations on Google Cloud Storage (GCS).
GcpIntegration (Integration)
Definition of Google Cloud Platform integration for ZenML.
Source code in zenml/integrations/gcp/__init__.py
class GcpIntegration(Integration):
"""Definition of Google Cloud Platform integration for ZenML."""
NAME = GCP
REQUIREMENTS = ["gcsfs"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.gcp import artifact_stores # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/gcp/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.gcp import artifact_stores # noqa
artifact_stores
special
gcp_artifact_store
GCPArtifactStore (BaseArtifactStore)
pydantic-model
Artifact Store for Google Cloud Storage based artifacts.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
class GCPArtifactStore(BaseArtifactStore):
"""Artifact Store for Google Cloud Storage based artifacts."""
_filesystem: Optional[gcsfs.GCSFileSystem] = None
# Class Configuration
FLAVOR: ClassVar[str] = "gcp"
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"gs://"}
@property
def filesystem(self) -> gcsfs.GCSFileSystem:
"""The gcsfs filesystem to access this artifact store."""
if not self._filesystem:
self._filesystem = gcsfs.GCSFileSystem()
return self._filesystem
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
"""
return self.filesystem.open(path=path, mode=mode)
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileNotFoundError: If the source file does not exist.
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
def exists(self, path: PathType) -> bool:
"""Check whether a path exists."""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
return self.filesystem.glob(path=pattern) # type: ignore[no-any-return]
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory."""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory."""
return self.filesystem.listdir(path=path) # type: ignore[no-any-return]
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path. If needed also
create missing parent directories."""
self.filesystem.makedirs(path=path, exist_ok=True)
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path."""
self.filesystem.makedir(path=path)
def remove(self, path: PathType) -> None:
"""Remove the file at the given path."""
self.filesystem.rm_file(path=path)
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileNotFoundError: If the source file does not exist.
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
def rmtree(self, path: PathType) -> None:
"""Remove the given directory."""
self.filesystem.delete(path=path, recursive=True)
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path."""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
return self.filesystem.walk(path=top) # type: ignore[no-any-return]
filesystem: GCSFileSystem
property
readonly
The gcsfs filesystem to access this artifact store.
copyfile(self, src, dst, overwrite=False)
Copy a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path to copy from. |
required |
dst |
Union[bytes, str] |
The path to copy to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the source file does not exist. |
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileNotFoundError: If the source file does not exist.
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
exists(self, path)
Check whether a path exists.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Check whether a path exists."""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
glob(self, pattern)
Return all paths that match the given glob pattern. The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern to match, see details above. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that match the given glob pattern. |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
return self.filesystem.glob(path=pattern) # type: ignore[no-any-return]
isdir(self, path)
Check whether a path is a directory.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory."""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
listdir(self, path)
Return a list of files in a directory.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory."""
return self.filesystem.listdir(path=path) # type: ignore[no-any-return]
makedirs(self, path)
Create a directory at the given path. If needed also create missing parent directories.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path. If needed also
create missing parent directories."""
self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)
Create a directory at the given path.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path."""
self.filesystem.makedir(path=path)
open(self, path, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path of the file to open. |
required |
mode |
str |
Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported. |
'r' |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
"""
return self.filesystem.open(path=path, mode=mode)
remove(self, path)
Remove the file at the given path.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path."""
self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
If the source file does not exist. |
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileNotFoundError: If the source file does not exist.
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)
Remove the given directory.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Remove the given directory."""
self.filesystem.delete(path=path, recursive=True)
stat(self, path)
Return stat info for the given path.
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path."""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Unused argument to conform to interface. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Unused argument to conform to interface. |
None |
Returns:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/integrations/gcp/artifact_stores/gcp_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Returns:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
return self.filesystem.walk(path=top) # type: ignore[no-any-return]
graphviz
special
GraphvizIntegration (Integration)
Definition of Graphviz integration for ZenML.
Source code in zenml/integrations/graphviz/__init__.py
class GraphvizIntegration(Integration):
"""Definition of Graphviz integration for ZenML."""
NAME = GRAPHVIZ
REQUIREMENTS = ["graphviz>=0.17"]
SYSTEM_REQUIREMENTS = {"graphviz": "dot"}
visualizers
special
pipeline_run_dag_visualizer
PipelineRunDagVisualizer (BasePipelineRunVisualizer)
Visualize the lineage of runs in a pipeline.
Source code in zenml/integrations/graphviz/visualizers/pipeline_run_dag_visualizer.py
class PipelineRunDagVisualizer(BasePipelineRunVisualizer):
"""Visualize the lineage of runs in a pipeline."""
ARTIFACT_DEFAULT_COLOR = "blue"
ARTIFACT_CACHED_COLOR = "green"
ARTIFACT_SHAPE = "box"
ARTIFACT_PREFIX = "artifact_"
STEP_COLOR = "#431D93"
STEP_SHAPE = "ellipse"
STEP_PREFIX = "step_"
FONT = "Roboto"
@abstractmethod
def visualize(
self, object: PipelineRunView, *args: Any, **kwargs: Any
) -> graphviz.Digraph:
"""Creates a pipeline lineage diagram using graphviz."""
logger.warning(
"This integration is not completed yet. Results might be unexpected."
)
dot = graphviz.Digraph(comment=object.name)
# link the steps together
for step in object.steps:
# add each step as a node
dot.node(
self.STEP_PREFIX + str(step.id),
step.entrypoint_name,
shape=self.STEP_SHAPE,
)
# for each parent of a step, add an edge
for artifact_name, artifact in step.outputs.items():
dot.node(
self.ARTIFACT_PREFIX + str(artifact.id),
f"{artifact_name} \n" f"({artifact._data_type})",
shape=self.ARTIFACT_SHAPE,
)
dot.edge(
self.STEP_PREFIX + str(step.id),
self.ARTIFACT_PREFIX + str(artifact.id),
)
for artifact_name, artifact in step.inputs.items():
dot.edge(
self.ARTIFACT_PREFIX + str(artifact.id),
self.STEP_PREFIX + str(step.id),
)
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
dot.render(filename=f.name, format="png", view=True, cleanup=True)
return dot
visualize(self, object, *args, **kwargs)
Creates a pipeline lineage diagram using graphviz.
Source code in zenml/integrations/graphviz/visualizers/pipeline_run_dag_visualizer.py
@abstractmethod
def visualize(
self, object: PipelineRunView, *args: Any, **kwargs: Any
) -> graphviz.Digraph:
"""Creates a pipeline lineage diagram using graphviz."""
logger.warning(
"This integration is not completed yet. Results might be unexpected."
)
dot = graphviz.Digraph(comment=object.name)
# link the steps together
for step in object.steps:
# add each step as a node
dot.node(
self.STEP_PREFIX + str(step.id),
step.entrypoint_name,
shape=self.STEP_SHAPE,
)
# for each parent of a step, add an edge
for artifact_name, artifact in step.outputs.items():
dot.node(
self.ARTIFACT_PREFIX + str(artifact.id),
f"{artifact_name} \n" f"({artifact._data_type})",
shape=self.ARTIFACT_SHAPE,
)
dot.edge(
self.STEP_PREFIX + str(step.id),
self.ARTIFACT_PREFIX + str(artifact.id),
)
for artifact_name, artifact in step.inputs.items():
dot.edge(
self.ARTIFACT_PREFIX + str(artifact.id),
self.STEP_PREFIX + str(step.id),
)
with tempfile.NamedTemporaryFile(delete=False, suffix=".html") as f:
dot.render(filename=f.name, format="png", view=True, cleanup=True)
return dot
huggingface
special
HuggingfaceIntegration (Integration)
Definition of Huggingface integration for ZenML.
Source code in zenml/integrations/huggingface/__init__.py
class HuggingfaceIntegration(Integration):
"""Definition of Huggingface integration for ZenML."""
NAME = HUGGINGFACE
REQUIREMENTS = ["transformers", "datasets"]
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.huggingface import materializers # noqa
activate()
classmethod
Activates the integration.
Source code in zenml/integrations/huggingface/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates the integration."""
from zenml.integrations.huggingface import materializers # noqa
materializers
special
huggingface_datasets_materializer
HFDatasetMaterializer (BaseMaterializer)
Materializer to read data to and from huggingface datasets.
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
class HFDatasetMaterializer(BaseMaterializer):
"""Materializer to read data to and from huggingface datasets."""
ASSOCIATED_TYPES = (Dataset, DatasetDict)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[Any]) -> Dataset:
"""Reads Dataset"""
super().handle_input(data_type)
return load_from_disk(
os.path.join(self.artifact.uri, DEFAULT_DATASET_DIR)
)
def handle_return(self, ds: Type[Any]) -> None:
"""Writes a Dataset to the specified dir.
Args:
Dataset: The Dataset to write.
"""
super().handle_return(ds)
temp_dir = TemporaryDirectory()
ds.save_to_disk(temp_dir.name)
fileio_utils.copy_dir(
temp_dir.name, os.path.join(self.artifact.uri, DEFAULT_DATASET_DIR)
)
handle_input(self, data_type)
Reads Dataset
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def handle_input(self, data_type: Type[Any]) -> Dataset:
"""Reads Dataset"""
super().handle_input(data_type)
return load_from_disk(
os.path.join(self.artifact.uri, DEFAULT_DATASET_DIR)
)
handle_return(self, ds)
Writes a Dataset to the specified dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
Dataset |
The Dataset to write. |
required |
Source code in zenml/integrations/huggingface/materializers/huggingface_datasets_materializer.py
def handle_return(self, ds: Type[Any]) -> None:
"""Writes a Dataset to the specified dir.
Args:
Dataset: The Dataset to write.
"""
super().handle_return(ds)
temp_dir = TemporaryDirectory()
ds.save_to_disk(temp_dir.name)
fileio_utils.copy_dir(
temp_dir.name, os.path.join(self.artifact.uri, DEFAULT_DATASET_DIR)
)
huggingface_pt_model_materializer
HFPTModelMaterializer (BaseMaterializer)
Materializer to read torch model to and from huggingface pretrained model.
Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
class HFPTModelMaterializer(BaseMaterializer):
"""Materializer to read torch model to and from huggingface pretrained model."""
ASSOCIATED_TYPES = (PreTrainedModel,)
ASSOCIATED_ARTIFACT_TYPES = (ModelArtifact,)
def handle_input(self, data_type: Type[Any]) -> PreTrainedModel:
"""Reads HFModel"""
super().handle_input(data_type)
config = AutoConfig.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_PT_MODEL_DIR)
)
architecture = config.architectures[0]
model_cls = getattr(
importlib.import_module("transformers"), architecture
)
return model_cls.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_PT_MODEL_DIR)
)
def handle_return(self, model: Type[Any]) -> None:
"""Writes a Model to the specified dir.
Args:
PreTrainedModel: The Torch Model to write.
"""
super().handle_return(model)
temp_dir = TemporaryDirectory()
model.save_pretrained(temp_dir.name)
fileio_utils.copy_dir(
temp_dir.name,
os.path.join(self.artifact.uri, DEFAULT_PT_MODEL_DIR),
)
handle_input(self, data_type)
Reads HFModel
Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
def handle_input(self, data_type: Type[Any]) -> PreTrainedModel:
"""Reads HFModel"""
super().handle_input(data_type)
config = AutoConfig.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_PT_MODEL_DIR)
)
architecture = config.architectures[0]
model_cls = getattr(
importlib.import_module("transformers"), architecture
)
return model_cls.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_PT_MODEL_DIR)
)
handle_return(self, model)
Writes a Model to the specified dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
PreTrainedModel |
The Torch Model to write. |
required |
Source code in zenml/integrations/huggingface/materializers/huggingface_pt_model_materializer.py
def handle_return(self, model: Type[Any]) -> None:
"""Writes a Model to the specified dir.
Args:
PreTrainedModel: The Torch Model to write.
"""
super().handle_return(model)
temp_dir = TemporaryDirectory()
model.save_pretrained(temp_dir.name)
fileio_utils.copy_dir(
temp_dir.name,
os.path.join(self.artifact.uri, DEFAULT_PT_MODEL_DIR),
)
huggingface_tf_model_materializer
HFTFModelMaterializer (BaseMaterializer)
Materializer to read tensorflow model to and from huggingface pretrained model.
Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
class HFTFModelMaterializer(BaseMaterializer):
"""Materializer to read tensorflow model to and from huggingface pretrained model."""
ASSOCIATED_TYPES = (TFPreTrainedModel,)
ASSOCIATED_ARTIFACT_TYPES = (ModelArtifact,)
def handle_input(self, data_type: Type[Any]) -> TFPreTrainedModel:
"""Reads HFModel"""
super().handle_input(data_type)
config = AutoConfig.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_TF_MODEL_DIR)
)
architecture = "TF" + config.architectures[0]
model_cls = getattr(
importlib.import_module("transformers"), architecture
)
return model_cls.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_TF_MODEL_DIR)
)
def handle_return(self, model: Type[Any]) -> None:
"""Writes a Model to the specified dir.
Args:
TFPreTrainedModel: The TF Model to write.
"""
super().handle_return(model)
temp_dir = TemporaryDirectory()
model.save_pretrained(temp_dir.name)
fileio_utils.copy_dir(
temp_dir.name,
os.path.join(self.artifact.uri, DEFAULT_TF_MODEL_DIR),
)
handle_input(self, data_type)
Reads HFModel
Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
def handle_input(self, data_type: Type[Any]) -> TFPreTrainedModel:
"""Reads HFModel"""
super().handle_input(data_type)
config = AutoConfig.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_TF_MODEL_DIR)
)
architecture = "TF" + config.architectures[0]
model_cls = getattr(
importlib.import_module("transformers"), architecture
)
return model_cls.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_TF_MODEL_DIR)
)
handle_return(self, model)
Writes a Model to the specified dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
TFPreTrainedModel |
The TF Model to write. |
required |
Source code in zenml/integrations/huggingface/materializers/huggingface_tf_model_materializer.py
def handle_return(self, model: Type[Any]) -> None:
"""Writes a Model to the specified dir.
Args:
TFPreTrainedModel: The TF Model to write.
"""
super().handle_return(model)
temp_dir = TemporaryDirectory()
model.save_pretrained(temp_dir.name)
fileio_utils.copy_dir(
temp_dir.name,
os.path.join(self.artifact.uri, DEFAULT_TF_MODEL_DIR),
)
huggingface_tokenizer_materializer
HFTokenizerMaterializer (BaseMaterializer)
Materializer to read tokenizer to and from huggingface tokenizer.
Source code in zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
class HFTokenizerMaterializer(BaseMaterializer):
"""Materializer to read tokenizer to and from huggingface tokenizer."""
ASSOCIATED_TYPES = (PreTrainedTokenizerBase,)
ASSOCIATED_ARTIFACT_TYPES = (ModelArtifact,)
def handle_input(self, data_type: Type[Any]) -> PreTrainedTokenizerBase:
"""Reads Tokenizer"""
super().handle_input(data_type)
return AutoTokenizer.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_TOKENIZER_DIR)
)
def handle_return(self, tokenizer: Type[Any]) -> None:
"""Writes a Tokenizer to the specified dir.
Args:
PreTrainedTokenizerBase: The HFTokenizer to write.
"""
super().handle_return(tokenizer)
temp_dir = TemporaryDirectory()
tokenizer.save_pretrained(temp_dir.name)
fileio_utils.copy_dir(
temp_dir.name,
os.path.join(self.artifact.uri, DEFAULT_TOKENIZER_DIR),
)
handle_input(self, data_type)
Reads Tokenizer
Source code in zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
def handle_input(self, data_type: Type[Any]) -> PreTrainedTokenizerBase:
"""Reads Tokenizer"""
super().handle_input(data_type)
return AutoTokenizer.from_pretrained(
os.path.join(self.artifact.uri, DEFAULT_TOKENIZER_DIR)
)
handle_return(self, tokenizer)
Writes a Tokenizer to the specified dir.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
PreTrainedTokenizerBase |
The HFTokenizer to write. |
required |
Source code in zenml/integrations/huggingface/materializers/huggingface_tokenizer_materializer.py
def handle_return(self, tokenizer: Type[Any]) -> None:
"""Writes a Tokenizer to the specified dir.
Args:
PreTrainedTokenizerBase: The HFTokenizer to write.
"""
super().handle_return(tokenizer)
temp_dir = TemporaryDirectory()
tokenizer.save_pretrained(temp_dir.name)
fileio_utils.copy_dir(
temp_dir.name,
os.path.join(self.artifact.uri, DEFAULT_TOKENIZER_DIR),
)
integration
Integration
Base class for integration in ZenML
Source code in zenml/integrations/integration.py
class Integration(metaclass=IntegrationMeta):
"""Base class for integration in ZenML"""
NAME = "base_integration"
REQUIREMENTS: List[str] = []
SYSTEM_REQUIREMENTS: Dict[str, str] = {}
@classmethod
def check_installation(cls) -> bool:
"""Method to check whether the required packages are installed"""
try:
for requirement, command in cls.SYSTEM_REQUIREMENTS.items():
result = shutil.which(command)
if result is None:
logger.debug(
"Unable to find the required packages for %s on your "
"system. Please install the packages on your system "
"and try again.",
requirement,
)
return False
for r in cls.REQUIREMENTS:
pkg_resources.get_distribution(r)
logger.debug(
f"Integration {cls.NAME} is installed correctly with "
f"requirements {cls.REQUIREMENTS}."
)
return True
except pkg_resources.DistributionNotFound as e:
logger.debug(
f"Unable to find required package '{e.req}' for "
f"integration {cls.NAME}."
)
return False
except pkg_resources.VersionConflict as e:
logger.debug(
f"VersionConflict error when loading installation {cls.NAME}: "
f"{str(e)}"
)
return False
@staticmethod
def activate() -> None:
"""Abstract method to activate the integration"""
activate()
staticmethod
Abstract method to activate the integration
Source code in zenml/integrations/integration.py
@staticmethod
def activate() -> None:
"""Abstract method to activate the integration"""
check_installation()
classmethod
Method to check whether the required packages are installed
Source code in zenml/integrations/integration.py
@classmethod
def check_installation(cls) -> bool:
"""Method to check whether the required packages are installed"""
try:
for requirement, command in cls.SYSTEM_REQUIREMENTS.items():
result = shutil.which(command)
if result is None:
logger.debug(
"Unable to find the required packages for %s on your "
"system. Please install the packages on your system "
"and try again.",
requirement,
)
return False
for r in cls.REQUIREMENTS:
pkg_resources.get_distribution(r)
logger.debug(
f"Integration {cls.NAME} is installed correctly with "
f"requirements {cls.REQUIREMENTS}."
)
return True
except pkg_resources.DistributionNotFound as e:
logger.debug(
f"Unable to find required package '{e.req}' for "
f"integration {cls.NAME}."
)
return False
except pkg_resources.VersionConflict as e:
logger.debug(
f"VersionConflict error when loading installation {cls.NAME}: "
f"{str(e)}"
)
return False
IntegrationMeta (type)
Metaclass responsible for registering different Integration subclasses
Source code in zenml/integrations/integration.py
class IntegrationMeta(type):
"""Metaclass responsible for registering different Integration
subclasses"""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "IntegrationMeta":
"""Hook into creation of an Integration class."""
cls = cast(Type["Integration"], super().__new__(mcs, name, bases, dct))
if name != "Integration":
integration_registry.register_integration(cls.NAME, cls)
return cls
__new__(mcs, name, bases, dct)
special
staticmethod
Hook into creation of an Integration class.
Source code in zenml/integrations/integration.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "IntegrationMeta":
"""Hook into creation of an Integration class."""
cls = cast(Type["Integration"], super().__new__(mcs, name, bases, dct))
if name != "Integration":
integration_registry.register_integration(cls.NAME, cls)
return cls
kubeflow
special
The Kubeflow integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Kubeflow orchestrator with the CLI tool.
KubeflowIntegration (Integration)
Definition of Kubeflow Integration for ZenML.
Source code in zenml/integrations/kubeflow/__init__.py
class KubeflowIntegration(Integration):
"""Definition of Kubeflow Integration for ZenML."""
NAME = KUBEFLOW
REQUIREMENTS = ["kfp==1.8.9"]
@classmethod
def activate(cls) -> None:
"""Activates all classes required for the airflow integration."""
from zenml.integrations.kubeflow import metadata_stores # noqa
from zenml.integrations.kubeflow import orchestrators # noqa
activate()
classmethod
Activates all classes required for the airflow integration.
Source code in zenml/integrations/kubeflow/__init__.py
@classmethod
def activate(cls) -> None:
"""Activates all classes required for the airflow integration."""
from zenml.integrations.kubeflow import metadata_stores # noqa
from zenml.integrations.kubeflow import orchestrators # noqa
container_entrypoint
Main entrypoint for containers with Kubeflow TFX component executors.
main()
Runs a single step defined by the command line arguments.
Source code in zenml/integrations/kubeflow/container_entrypoint.py
def main() -> None:
"""Runs a single step defined by the command line arguments."""
# Log to the container's stdout so Kubeflow Pipelines UI can display logs to
# the user.
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
args = _parse_command_line_arguments()
tfx_pipeline = pipeline_pb2.Pipeline()
json_format.Parse(args.tfx_ir, tfx_pipeline)
run_name = _get_run_name()
_resolve_runtime_parameters(tfx_pipeline, run_name, args.runtime_parameter)
node_id = args.node_id
pipeline_node = _get_pipeline_node(tfx_pipeline, node_id)
deployment_config = runner_utils.extract_local_deployment_config(
tfx_pipeline
)
executor_spec = runner_utils.extract_executor_spec(
deployment_config, node_id
)
custom_driver_spec = runner_utils.extract_custom_driver_spec(
deployment_config, node_id
)
# make sure all integrations are activated so all materializers etc. are
# available
integration_registry.activate_integrations()
repo = Repository()
metadata_store = repo.active_stack.metadata_store
metadata_connection = metadata.Metadata(
metadata_store.get_tfx_metadata_config()
)
# import the user main module to register all the materializers
importlib.import_module(args.main_module)
zenml.constants.USER_MAIN_MODULE = args.main_module
step_module = importlib.import_module(args.step_module)
step_class = getattr(step_module, args.step_function_name)
step_instance = cast(BaseStep, step_class())
if hasattr(executor_spec, "class_path"):
executor_module_parts = getattr(executor_spec, "class_path").split(".")
executor_class_target_module_name = ".".join(executor_module_parts[:-1])
_create_executor_class(
step=step_instance,
executor_class_target_module_name=executor_class_target_module_name,
input_artifact_type_mapping=json.loads(args.input_artifact_types),
)
else:
raise RuntimeError(
f"No class path found inside executor spec: {executor_spec}."
)
custom_executor_operators = {
executable_spec_pb2.PythonClassExecutableSpec: step_instance.executor_operator
}
component_launcher = launcher.Launcher(
pipeline_node=pipeline_node,
mlmd_connection=metadata_connection,
pipeline_info=tfx_pipeline.pipeline_info,
pipeline_runtime_spec=tfx_pipeline.runtime_spec,
executor_spec=executor_spec,
custom_driver_spec=custom_driver_spec,
custom_executor_operators=custom_executor_operators,
)
repo.active_stack.prepare_step_run()
execution_info = execute_step(component_launcher)
repo.active_stack.prepare_step_run()
if execution_info:
_dump_ui_metadata(pipeline_node, execution_info, args.metadata_ui_path)
metadata_stores
special
kubeflow_metadata_store
KubeflowMetadataStore (BaseMetadataStore)
pydantic-model
Kubeflow GRPC backend for ZenML metadata store.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
class KubeflowMetadataStore(BaseMetadataStore):
"""Kubeflow GRPC backend for ZenML metadata store."""
upgrade_migration_enabled: bool = False
host: str = "127.0.0.1"
port: int = DEFAULT_KFP_METADATA_GRPC_PORT
# Class Configuration
FLAVOR: ClassVar[str] = KUBEFLOW
@property
def validator(self) -> Optional[StackValidator]:
"""Validates that the stack contains a KFP orchestrator."""
def _ensure_kfp_orchestrator(stack: Stack) -> Tuple[bool, str]:
return (
stack.orchestrator.FLAVOR == KUBEFLOW,
"The Kubeflow metadata store can only be used with a Kubeflow "
"orchestrator.",
)
return StackValidator(
custom_validation_function=_ensure_kfp_orchestrator
)
def get_tfx_metadata_config(
self,
) -> Union[
metadata_store_pb2.ConnectionConfig,
metadata_store_pb2.MetadataStoreClientConfig,
]:
"""Return tfx metadata config for the kubeflow metadata store."""
connection_config = metadata_store_pb2.MetadataStoreClientConfig()
if inside_kfp_pod():
connection_config.host = os.environ["METADATA_GRPC_SERVICE_HOST"]
connection_config.port = int(
os.environ["METADATA_GRPC_SERVICE_PORT"]
)
else:
if not self.is_running:
raise RuntimeError(
"The KFP metadata daemon is not running. Please run the "
"following command to start it first:\n\n"
" 'zenml metadata-store up'\n"
)
connection_config.host = self.host
connection_config.port = self.port
return connection_config
@property
def kfp_orchestrator(self) -> KubeflowOrchestrator:
"""Returns the Kubeflow orchestrator in the active stack."""
repo = Repository(skip_repository_check=True) # type: ignore[call-arg]
return cast(KubeflowOrchestrator, repo.active_stack.orchestrator)
@property
def kubernetes_context(self) -> str:
"""Returns the kubernetes context to the cluster where the Kubeflow
Pipelines services are running."""
kubernetes_context = self.kfp_orchestrator.kubernetes_context
# will never happen, but mypy doesn't know that
assert kubernetes_context is not None
return kubernetes_context
@property
def root_directory(self) -> str:
"""Returns path to the root directory for all files concerning
this KFP metadata store.
Note: the root directory for the KFP metadata store is relative to the
root directory of the KFP orchestrator, because it is a sub-component
of it.
"""
return os.path.join(
self.kfp_orchestrator.root_directory,
"metadata-store",
str(self.uuid),
)
@property
def _pid_file_path(self) -> str:
"""Returns path to the daemon PID file."""
return os.path.join(self.root_directory, "kubeflow_daemon.pid")
@property
def _log_file(self) -> str:
"""Path of the daemon log file."""
return os.path.join(self.root_directory, "kubeflow_daemon.log")
@property
def is_provisioned(self) -> bool:
"""If the component provisioned resources to run locally."""
return fileio.exists(self.root_directory)
@property
def is_running(self) -> bool:
"""If the component is running locally."""
if sys.platform != "win32":
from zenml.utils.daemon import check_if_daemon_is_running
if not check_if_daemon_is_running(self._pid_file_path):
return False
else:
# Daemon functionality is not supported on Windows, so the PID
# file won't exist. This if clause exists just for mypy to not
# complain about missing functions
pass
return True
def provision(self) -> None:
"""Provisions resources to run the component locally."""
logger.info("Provisioning local Kubeflow Pipelines deployment...")
fileio.makedirs(self.root_directory)
def deprovision(self) -> None:
"""Deprovisions all local resources of the component."""
if fileio.exists(self._log_file):
fileio.remove(self._log_file)
logger.info("Local kubeflow pipelines deployment deprovisioned.")
def resume(self) -> None:
"""Resumes the local k3d cluster."""
if self.is_running:
logger.info("Local kubeflow pipelines deployment already running.")
return
self.start_kfp_metadata_daemon()
self.wait_until_metadata_store_ready()
def suspend(self) -> None:
"""Suspends the local k3d cluster."""
if not self.is_running:
logger.info("Local kubeflow pipelines deployment not running.")
return
self.stop_kfp_metadata_daemon()
def start_kfp_metadata_daemon(self) -> None:
"""Starts a daemon process that forwards ports so the Kubeflow Pipelines
Metadata MySQL database is accessible on the localhost."""
command = [
"kubectl",
"--context",
self.kubernetes_context,
"--namespace",
"kubeflow",
"port-forward",
"svc/metadata-grpc-service",
f"{self.port}:8080",
]
if sys.platform == "win32":
logger.warning(
"Daemon functionality not supported on Windows. "
"In order to access the Kubeflow Pipelines Metadata locally, "
"please run '%s' in a separate command line shell.",
self.port,
" ".join(command),
)
elif not networking_utils.port_available(self.port):
raise ProvisioningError(
f"Unable to port-forward Kubeflow Pipelines Metadata to local "
f"port {self.port} because the port is occupied. In order to "
f"access the Kubeflow Pipelines Metadata locally, please "
f"change the metadata store configuration to use an available "
f"port or stop the other process currently using the port."
)
else:
from zenml.utils import daemon
def _daemon_function() -> None:
"""Forwards the port of the Kubeflow Pipelines Metadata pod ."""
subprocess.check_call(command)
daemon.run_as_daemon(
_daemon_function,
pid_file=self._pid_file_path,
log_file=self._log_file,
)
logger.info(
"Started Kubeflow Pipelines Metadata daemon (check the daemon"
"logs at %s in case you're not able to access the pipeline"
"metadata).",
self._log_file,
)
def stop_kfp_metadata_daemon(self) -> None:
"""Stops the KFP Metadata daemon process if it is running."""
if fileio.exists(self._pid_file_path):
if sys.platform == "win32":
# Daemon functionality is not supported on Windows, so the PID
# file won't exist. This if clause exists just for mypy to not
# complain about missing functions
pass
else:
from zenml.utils import daemon
daemon.stop_daemon(self._pid_file_path)
fileio.remove(self._pid_file_path)
def wait_until_metadata_store_ready(
self, timeout: int = DEFAULT_KFP_METADATA_DAEMON_TIMEOUT
) -> None:
"""Waits until the metadata store connection is ready, an irrecoverable
error occurs or the timeout expires."""
logger.info(
"Waiting for the Kubeflow metadata store to be ready (this might "
"take a few minutes)."
)
while True:
try:
# it doesn't matter what we call here as long as it exercises
# the MLMD connection
self.get_pipelines()
break
except Exception as e:
logger.info(
"The Kubeflow metadata store is not ready yet. Waiting for "
"10 seconds..."
)
if timeout <= 0:
raise RuntimeError(
f"An unexpected error was encountered while waiting for the "
f"Kubeflow metadata store to be functional: {str(e)}"
) from e
timeout -= 10
time.sleep(10)
logger.info("The Kubeflow metadata store is functional.")
is_provisioned: bool
property
readonly
If the component provisioned resources to run locally.
is_running: bool
property
readonly
If the component is running locally.
kfp_orchestrator: KubeflowOrchestrator
property
readonly
Returns the Kubeflow orchestrator in the active stack.
kubernetes_context: str
property
readonly
Returns the kubernetes context to the cluster where the Kubeflow Pipelines services are running.
root_directory: str
property
readonly
Returns path to the root directory for all files concerning this KFP metadata store.
Note: the root directory for the KFP metadata store is relative to the root directory of the KFP orchestrator, because it is a sub-component of it.
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validates that the stack contains a KFP orchestrator.
deprovision(self)
Deprovisions all local resources of the component.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
def deprovision(self) -> None:
"""Deprovisions all local resources of the component."""
if fileio.exists(self._log_file):
fileio.remove(self._log_file)
logger.info("Local kubeflow pipelines deployment deprovisioned.")
get_tfx_metadata_config(self)
Return tfx metadata config for the kubeflow metadata store.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
def get_tfx_metadata_config(
self,
) -> Union[
metadata_store_pb2.ConnectionConfig,
metadata_store_pb2.MetadataStoreClientConfig,
]:
"""Return tfx metadata config for the kubeflow metadata store."""
connection_config = metadata_store_pb2.MetadataStoreClientConfig()
if inside_kfp_pod():
connection_config.host = os.environ["METADATA_GRPC_SERVICE_HOST"]
connection_config.port = int(
os.environ["METADATA_GRPC_SERVICE_PORT"]
)
else:
if not self.is_running:
raise RuntimeError(
"The KFP metadata daemon is not running. Please run the "
"following command to start it first:\n\n"
" 'zenml metadata-store up'\n"
)
connection_config.host = self.host
connection_config.port = self.port
return connection_config
provision(self)
Provisions resources to run the component locally.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
def provision(self) -> None:
"""Provisions resources to run the component locally."""
logger.info("Provisioning local Kubeflow Pipelines deployment...")
fileio.makedirs(self.root_directory)
resume(self)
Resumes the local k3d cluster.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
def resume(self) -> None:
"""Resumes the local k3d cluster."""
if self.is_running:
logger.info("Local kubeflow pipelines deployment already running.")
return
self.start_kfp_metadata_daemon()
self.wait_until_metadata_store_ready()
start_kfp_metadata_daemon(self)
Starts a daemon process that forwards ports so the Kubeflow Pipelines Metadata MySQL database is accessible on the localhost.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
def start_kfp_metadata_daemon(self) -> None:
"""Starts a daemon process that forwards ports so the Kubeflow Pipelines
Metadata MySQL database is accessible on the localhost."""
command = [
"kubectl",
"--context",
self.kubernetes_context,
"--namespace",
"kubeflow",
"port-forward",
"svc/metadata-grpc-service",
f"{self.port}:8080",
]
if sys.platform == "win32":
logger.warning(
"Daemon functionality not supported on Windows. "
"In order to access the Kubeflow Pipelines Metadata locally, "
"please run '%s' in a separate command line shell.",
self.port,
" ".join(command),
)
elif not networking_utils.port_available(self.port):
raise ProvisioningError(
f"Unable to port-forward Kubeflow Pipelines Metadata to local "
f"port {self.port} because the port is occupied. In order to "
f"access the Kubeflow Pipelines Metadata locally, please "
f"change the metadata store configuration to use an available "
f"port or stop the other process currently using the port."
)
else:
from zenml.utils import daemon
def _daemon_function() -> None:
"""Forwards the port of the Kubeflow Pipelines Metadata pod ."""
subprocess.check_call(command)
daemon.run_as_daemon(
_daemon_function,
pid_file=self._pid_file_path,
log_file=self._log_file,
)
logger.info(
"Started Kubeflow Pipelines Metadata daemon (check the daemon"
"logs at %s in case you're not able to access the pipeline"
"metadata).",
self._log_file,
)
stop_kfp_metadata_daemon(self)
Stops the KFP Metadata daemon process if it is running.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
def stop_kfp_metadata_daemon(self) -> None:
"""Stops the KFP Metadata daemon process if it is running."""
if fileio.exists(self._pid_file_path):
if sys.platform == "win32":
# Daemon functionality is not supported on Windows, so the PID
# file won't exist. This if clause exists just for mypy to not
# complain about missing functions
pass
else:
from zenml.utils import daemon
daemon.stop_daemon(self._pid_file_path)
fileio.remove(self._pid_file_path)
suspend(self)
Suspends the local k3d cluster.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
def suspend(self) -> None:
"""Suspends the local k3d cluster."""
if not self.is_running:
logger.info("Local kubeflow pipelines deployment not running.")
return
self.stop_kfp_metadata_daemon()
wait_until_metadata_store_ready(self, timeout=60)
Waits until the metadata store connection is ready, an irrecoverable error occurs or the timeout expires.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
def wait_until_metadata_store_ready(
self, timeout: int = DEFAULT_KFP_METADATA_DAEMON_TIMEOUT
) -> None:
"""Waits until the metadata store connection is ready, an irrecoverable
error occurs or the timeout expires."""
logger.info(
"Waiting for the Kubeflow metadata store to be ready (this might "
"take a few minutes)."
)
while True:
try:
# it doesn't matter what we call here as long as it exercises
# the MLMD connection
self.get_pipelines()
break
except Exception as e:
logger.info(
"The Kubeflow metadata store is not ready yet. Waiting for "
"10 seconds..."
)
if timeout <= 0:
raise RuntimeError(
f"An unexpected error was encountered while waiting for the "
f"Kubeflow metadata store to be functional: {str(e)}"
) from e
timeout -= 10
time.sleep(10)
logger.info("The Kubeflow metadata store is functional.")
inside_kfp_pod()
Returns if the current python process is running inside a KFP Pod.
Source code in zenml/integrations/kubeflow/metadata_stores/kubeflow_metadata_store.py
def inside_kfp_pod() -> bool:
"""Returns if the current python process is running inside a KFP Pod."""
if "KFP_POD_NAME" not in os.environ:
return False
try:
k8s_config.load_incluster_config()
return True
except k8s_config.ConfigException:
return False
orchestrators
special
kubeflow_component
Kubeflow Pipelines based implementation of TFX components. These components are lightweight wrappers around the KFP DSL's ContainerOp, and ensure that the container gets called with the right set of input arguments. It also ensures that each component exports named output attributes that are consistent with those provided by the native TFX components, thus ensuring that both types of pipeline definitions are compatible. Note: This requires Kubeflow Pipelines SDK to be installed.
KubeflowComponent
Base component for all Kubeflow pipelines TFX components. Returns a wrapper around a KFP DSL ContainerOp class, and adds named output attributes that match the output names for the corresponding native TFX components.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_component.py
class KubeflowComponent:
"""Base component for all Kubeflow pipelines TFX components.
Returns a wrapper around a KFP DSL ContainerOp class, and adds named output
attributes that match the output names for the corresponding native TFX
components.
"""
def __init__(
self,
component: tfx_base_component.BaseComponent,
depends_on: Set[dsl.ContainerOp],
image: str,
tfx_ir: pipeline_pb2.Pipeline,
pod_labels_to_attach: Dict[str, str],
main_module: str,
step_module: str,
step_function_name: str,
runtime_parameters: List[data_types.RuntimeParameter],
):
"""Creates a new Kubeflow-based component.
This class essentially wraps a dsl.ContainerOp construct in Kubeflow
Pipelines.
Args:
component: The logical TFX component to wrap.
depends_on: The set of upstream KFP ContainerOp components that this
component will depend on.
image: The container image to use for this component.
tfx_ir: The TFX intermedia representation of the pipeline.
pod_labels_to_attach: Dict of pod labels to attach to the GKE pod.
runtime_parameters: Runtime parameters of the pipeline.
"""
# Path to a metadata file that will be displayed in the KFP UI
# This metadata file needs to be in a mounted emptyDir to avoid
# sporadic failures with the (not mature) PNS executor
# See these links for more information about limitations of PNS +
# security context:
# https://www.kubeflow.org/docs/components/pipelines/installation/localcluster-deployment/#deploying-kubeflow-pipelines
# https://argoproj.github.io/argo-workflows/empty-dir/
# KFP will switch to the Emissary executor (soon), when this emptyDir
# mount will not be necessary anymore, but for now it's still in alpha
# status (https://www.kubeflow.org/docs/components/pipelines/installation/choose-executor/#emissary-executor)
metadata_ui_path = "/outputs/mlpipeline-ui-metadata.json"
volumes: Dict[str, k8s_client.V1Volume] = {
"/outputs": k8s_client.V1Volume(
name="outputs", empty_dir=k8s_client.V1EmptyDirVolumeSource()
),
}
utils.replace_placeholder(component)
input_artifact_type_mapping = _get_input_artifact_type_mapping(
component
)
arguments = [
"--node_id",
component.id,
"--tfx_ir",
json_format.MessageToJson(tfx_ir),
"--metadata_ui_path",
metadata_ui_path,
"--main_module",
main_module,
"--step_module",
step_module,
"--step_function_name",
step_function_name,
"--input_artifact_types",
json.dumps(input_artifact_type_mapping),
]
for param in runtime_parameters:
arguments.append("--runtime_parameter")
arguments.append(_encode_runtime_parameter(param))
stack = Repository().active_stack
global_cfg_dir = get_global_config_directory()
# go through all stack components and identify those that advertise
# a local path where they persist information that they need to be
# available when running pipelines. For those that do, mount them
# into the Kubeflow container.
has_local_repos = False
for stack_comp in stack.components.values():
local_path = stack_comp.local_path
if not local_path:
continue
# double-check this convention, just in case it wasn't respected
# as documented in `StackComponent.local_path`
if not local_path.startswith(global_cfg_dir):
raise ValueError(
f"Local path {local_path} for component {stack_comp.name} "
f"is not in the global config directory ({global_cfg_dir})."
)
has_local_repos = True
host_path = k8s_client.V1HostPathVolumeSource(
path=local_path, type="Directory"
)
volume_name = f"{stack_comp.TYPE.value}-{stack_comp.name}"
volumes[local_path] = k8s_client.V1Volume(
name=re.sub(r"[^0-9a-zA-Z-]+", "-", volume_name)
.strip("-")
.lower(),
host_path=host_path,
)
logger.debug(
"Adding host path volume for %s %s (path: %s) "
"in kubeflow pipelines container.",
stack_comp.TYPE.value,
stack_comp.name,
local_path,
)
self.container_op = dsl.ContainerOp(
name=component.id,
command=CONTAINER_ENTRYPOINT_COMMAND,
image=image,
arguments=arguments,
output_artifact_paths={
"mlpipeline-ui-metadata": metadata_ui_path,
},
pvolumes=volumes,
)
if has_local_repos:
if sys.platform == "win32":
# File permissions are not checked on Windows. This if clause
# prevents mypy from complaining about unused 'type: ignore'
# statements
pass
else:
# Run KFP containers in the context of the local UID/GID
# to ensure that the artifact and metadata stores can be shared
# with the local pipeline runs.
self.container_op.container.security_context = (
k8s_client.V1SecurityContext(
run_as_user=os.getuid(),
run_as_group=os.getgid(),
)
)
logger.debug(
"Setting security context UID and GID to local user/group "
"in kubeflow pipelines container."
)
for op in depends_on:
self.container_op.after(op)
self.container_op.container.add_env_variable(
k8s_client.V1EnvVar(
name=ENV_ZENML_PREVENT_PIPELINE_EXECUTION, value="True"
)
)
# Add environment variables for Azure Blob Storage to pod in case they
# are set locally
# TODO [ENG-699]: remove this as soon as we implement credential handling
for key in [
"AZURE_STORAGE_ACCOUNT_KEY",
"AZURE_STORAGE_ACCOUNT_NAME",
"AZURE_STORAGE_CONNECTION_STRING",
"AZURE_STORAGE_SAS_TOKEN",
]:
value = os.getenv(key)
if value:
self.container_op.container.add_env_variable(
k8s_client.V1EnvVar(name=key, value=value)
)
for k, v in pod_labels_to_attach.items():
self.container_op.add_pod_label(k, v)
__init__(self, component, depends_on, image, tfx_ir, pod_labels_to_attach, main_module, step_module, step_function_name, runtime_parameters)
special
Creates a new Kubeflow-based component. This class essentially wraps a dsl.ContainerOp construct in Kubeflow Pipelines.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component |
BaseComponent |
The logical TFX component to wrap. |
required |
depends_on |
Set[kfp.dsl._container_op.ContainerOp] |
The set of upstream KFP ContainerOp components that this component will depend on. |
required |
image |
str |
The container image to use for this component. |
required |
tfx_ir |
Pipeline |
The TFX intermedia representation of the pipeline. |
required |
pod_labels_to_attach |
Dict[str, str] |
Dict of pod labels to attach to the GKE pod. |
required |
runtime_parameters |
List[tfx.orchestration.data_types.RuntimeParameter] |
Runtime parameters of the pipeline. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_component.py
def __init__(
self,
component: tfx_base_component.BaseComponent,
depends_on: Set[dsl.ContainerOp],
image: str,
tfx_ir: pipeline_pb2.Pipeline,
pod_labels_to_attach: Dict[str, str],
main_module: str,
step_module: str,
step_function_name: str,
runtime_parameters: List[data_types.RuntimeParameter],
):
"""Creates a new Kubeflow-based component.
This class essentially wraps a dsl.ContainerOp construct in Kubeflow
Pipelines.
Args:
component: The logical TFX component to wrap.
depends_on: The set of upstream KFP ContainerOp components that this
component will depend on.
image: The container image to use for this component.
tfx_ir: The TFX intermedia representation of the pipeline.
pod_labels_to_attach: Dict of pod labels to attach to the GKE pod.
runtime_parameters: Runtime parameters of the pipeline.
"""
# Path to a metadata file that will be displayed in the KFP UI
# This metadata file needs to be in a mounted emptyDir to avoid
# sporadic failures with the (not mature) PNS executor
# See these links for more information about limitations of PNS +
# security context:
# https://www.kubeflow.org/docs/components/pipelines/installation/localcluster-deployment/#deploying-kubeflow-pipelines
# https://argoproj.github.io/argo-workflows/empty-dir/
# KFP will switch to the Emissary executor (soon), when this emptyDir
# mount will not be necessary anymore, but for now it's still in alpha
# status (https://www.kubeflow.org/docs/components/pipelines/installation/choose-executor/#emissary-executor)
metadata_ui_path = "/outputs/mlpipeline-ui-metadata.json"
volumes: Dict[str, k8s_client.V1Volume] = {
"/outputs": k8s_client.V1Volume(
name="outputs", empty_dir=k8s_client.V1EmptyDirVolumeSource()
),
}
utils.replace_placeholder(component)
input_artifact_type_mapping = _get_input_artifact_type_mapping(
component
)
arguments = [
"--node_id",
component.id,
"--tfx_ir",
json_format.MessageToJson(tfx_ir),
"--metadata_ui_path",
metadata_ui_path,
"--main_module",
main_module,
"--step_module",
step_module,
"--step_function_name",
step_function_name,
"--input_artifact_types",
json.dumps(input_artifact_type_mapping),
]
for param in runtime_parameters:
arguments.append("--runtime_parameter")
arguments.append(_encode_runtime_parameter(param))
stack = Repository().active_stack
global_cfg_dir = get_global_config_directory()
# go through all stack components and identify those that advertise
# a local path where they persist information that they need to be
# available when running pipelines. For those that do, mount them
# into the Kubeflow container.
has_local_repos = False
for stack_comp in stack.components.values():
local_path = stack_comp.local_path
if not local_path:
continue
# double-check this convention, just in case it wasn't respected
# as documented in `StackComponent.local_path`
if not local_path.startswith(global_cfg_dir):
raise ValueError(
f"Local path {local_path} for component {stack_comp.name} "
f"is not in the global config directory ({global_cfg_dir})."
)
has_local_repos = True
host_path = k8s_client.V1HostPathVolumeSource(
path=local_path, type="Directory"
)
volume_name = f"{stack_comp.TYPE.value}-{stack_comp.name}"
volumes[local_path] = k8s_client.V1Volume(
name=re.sub(r"[^0-9a-zA-Z-]+", "-", volume_name)
.strip("-")
.lower(),
host_path=host_path,
)
logger.debug(
"Adding host path volume for %s %s (path: %s) "
"in kubeflow pipelines container.",
stack_comp.TYPE.value,
stack_comp.name,
local_path,
)
self.container_op = dsl.ContainerOp(
name=component.id,
command=CONTAINER_ENTRYPOINT_COMMAND,
image=image,
arguments=arguments,
output_artifact_paths={
"mlpipeline-ui-metadata": metadata_ui_path,
},
pvolumes=volumes,
)
if has_local_repos:
if sys.platform == "win32":
# File permissions are not checked on Windows. This if clause
# prevents mypy from complaining about unused 'type: ignore'
# statements
pass
else:
# Run KFP containers in the context of the local UID/GID
# to ensure that the artifact and metadata stores can be shared
# with the local pipeline runs.
self.container_op.container.security_context = (
k8s_client.V1SecurityContext(
run_as_user=os.getuid(),
run_as_group=os.getgid(),
)
)
logger.debug(
"Setting security context UID and GID to local user/group "
"in kubeflow pipelines container."
)
for op in depends_on:
self.container_op.after(op)
self.container_op.container.add_env_variable(
k8s_client.V1EnvVar(
name=ENV_ZENML_PREVENT_PIPELINE_EXECUTION, value="True"
)
)
# Add environment variables for Azure Blob Storage to pod in case they
# are set locally
# TODO [ENG-699]: remove this as soon as we implement credential handling
for key in [
"AZURE_STORAGE_ACCOUNT_KEY",
"AZURE_STORAGE_ACCOUNT_NAME",
"AZURE_STORAGE_CONNECTION_STRING",
"AZURE_STORAGE_SAS_TOKEN",
]:
value = os.getenv(key)
if value:
self.container_op.container.add_env_variable(
k8s_client.V1EnvVar(name=key, value=value)
)
for k, v in pod_labels_to_attach.items():
self.container_op.add_pod_label(k, v)
kubeflow_dag_runner
The below code is copied from the TFX source repo with minor changes. All credits go to the TFX team for the core implementation
KubeflowDagRunner
Kubeflow Pipelines runner. Constructs a pipeline definition YAML file based on the TFX logical pipeline.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_dag_runner.py
class KubeflowDagRunner:
"""Kubeflow Pipelines runner. Constructs a pipeline definition YAML file
based on the TFX logical pipeline.
"""
def __init__(
self,
config: KubeflowDagRunnerConfig,
output_path: str,
pod_labels_to_attach: Optional[Dict[str, str]] = None,
):
"""Initializes KubeflowDagRunner for compiling a Kubeflow Pipeline.
Args:
config: A KubeflowDagRunnerConfig object to specify runtime
configuration when running the pipeline under Kubeflow.
output_path: Path where the pipeline definition file will be stored.
pod_labels_to_attach: Optional set of pod labels to attach to GKE pod
spun up for this pipeline. Default to the 3 labels:
1. add-pod-env: true,
2. pipeline SDK type,
3. pipeline unique ID,
where 2 and 3 are instrumentation of usage tracking.
"""
self._config = config or pipeline_config.PipelineConfig()
self._kubeflow_config = config
self._output_path = output_path
self._compiler = compiler.Compiler()
self._tfx_compiler = tfx_compiler.Compiler()
self._params: List[dsl.PipelineParam] = []
self._params_by_component_id: Dict[
str, List[data_types.RuntimeParameter]
] = collections.defaultdict(list)
self._deduped_parameter_names: Set[str] = set()
self._pod_labels_to_attach = (
pod_labels_to_attach or get_default_pod_labels()
)
@property
def config(self) -> pipeline_config.PipelineConfig:
"""The config property"""
return self._config
def _parse_parameter_from_component(
self, component: tfx_base_component.BaseComponent
) -> None:
"""Extract embedded RuntimeParameter placeholders from a component.
Extract embedded RuntimeParameter placeholders from a component, then
append the corresponding dsl.PipelineParam to KubeflowDagRunner.
Args:
component: a TFX component.
"""
deduped_parameter_names_for_component = set()
for parameter in component.exec_properties.values():
if not isinstance(parameter, data_types.RuntimeParameter):
continue
# Ignore pipeline root because it will be added later.
if parameter.name == TFX_PIPELINE_ROOT_PARAMETER.name:
continue
if parameter.name in deduped_parameter_names_for_component:
continue
deduped_parameter_names_for_component.add(parameter.name)
self._params_by_component_id[component.id].append(parameter)
if parameter.name not in self._deduped_parameter_names:
self._deduped_parameter_names.add(parameter.name)
dsl_parameter = dsl.PipelineParam(
name=parameter.name, value=str(parameter.default)
)
self._params.append(dsl_parameter)
def _parse_parameter_from_pipeline(self, pipeline: TfxPipeline) -> None:
"""Extract all the RuntimeParameter placeholders from the pipeline."""
for component in pipeline.components:
self._parse_parameter_from_component(component)
def _construct_pipeline_graph(
self,
pipeline: "BasePipeline",
tfx_pipeline: TfxPipeline,
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Constructs a Kubeflow Pipeline graph.
Args:
pipeline: ZenML pipeline instance.
tfx_pipeline: The logical TFX pipeline to base the construction on.
stack: The ZenML stack that the pipeline is running on
runtime_configuration: The runtime configuration
"""
component_to_kfp_op: Dict[base_node.BaseNode, dsl.ContainerOp] = {}
tfx_ir: Pb2Pipeline = self._generate_tfx_ir(tfx_pipeline)
for node in tfx_ir.nodes:
pipeline_node: PipelineNode = node.pipeline_node
# Add the stack as context to each pipeline node:
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.STACK.value,
name=str(hash(json.dumps(stack.dict(), sort_keys=True))),
properties=stack.dict(),
)
# Add all pydantic objects from runtime_configuration to the
# context
context_utils.add_runtime_configuration_to_node(
pipeline_node, runtime_configuration
)
# Add pipeline requirements as a context
requirements = " ".join(sorted(pipeline.requirements))
context_utils.add_context_to_node(
pipeline_node,
type_=MetadataContextTypes.PIPELINE_REQUIREMENTS.value,
name=str(hash(requirements)),
properties={"pipeline_requirements": requirements},
)
# Assumption: There is a partial ordering of components in the list,
# i.e. if component A depends on component B and C, then A appears
# after B and C in the list.
for component in tfx_pipeline.components:
# Keep track of the set of upstream dsl.ContainerOps for this
# component.
depends_on = set()
for upstream_component in component.upstream_nodes:
depends_on.add(component_to_kfp_op[upstream_component])
# remove the extra pipeline node information
tfx_node_ir = self._dehydrate_tfx_ir(tfx_ir, component.id)
main_module = get_module_source_from_module(sys.modules["__main__"])
step_module = component.component_type.split(".")[:-1]
if step_module[0] == "__main__":
step_module = main_module
else:
step_module = ".".join(step_module)
kfp_component = KubeflowComponent(
main_module=main_module,
step_module=step_module,
step_function_name=component.id,
component=component,
depends_on=depends_on,
image=self._kubeflow_config.image,
pod_labels_to_attach=self._pod_labels_to_attach,
tfx_ir=tfx_node_ir,
runtime_parameters=self._params_by_component_id[component.id],
)
for operator in self._kubeflow_config.pipeline_operator_funcs:
kfp_component.container_op.apply(operator)
component_to_kfp_op[component] = kfp_component.container_op
def _del_unused_field(
self, node_id: str, message_dict: MutableMapping[str, Any]
) -> None:
"""Remove fields that are not used by the pipeline."""
for item in list(message_dict.keys()):
if item != node_id:
del message_dict[item]
def _dehydrate_tfx_ir(
self, original_pipeline: Pb2Pipeline, node_id: str
) -> Pb2Pipeline:
"""Dehydrate the TFX IR to remove unused fields."""
pipeline = copy.deepcopy(original_pipeline)
for node in pipeline.nodes:
if (
node.WhichOneof("node") == "pipeline_node"
and node.pipeline_node.node_info.id == node_id
):
del pipeline.nodes[:]
pipeline.nodes.extend([node])
break
deployment_config = IntermediateDeploymentConfig()
pipeline.deployment_config.Unpack(deployment_config)
self._del_unused_field(node_id, deployment_config.executor_specs)
self._del_unused_field(node_id, deployment_config.custom_driver_specs)
self._del_unused_field(
node_id, deployment_config.node_level_platform_configs
)
pipeline.deployment_config.Pack(deployment_config)
return pipeline
def _generate_tfx_ir(self, pipeline: TfxPipeline) -> Pb2Pipeline:
"""Generate the TFX IR from the logical TFX pipeline."""
result = self._tfx_compiler.compile(pipeline)
return result
def run(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Compiles and outputs a Kubeflow Pipeline YAML definition file.
Args:
pipeline: The logical TFX pipeline to use when building the Kubeflow
pipeline.
stack: The ZenML stack that the pipeline is running on.
runtime_configuration: The runtime configuration.
"""
tfx_pipeline = create_tfx_pipeline(pipeline, stack=stack)
pipeline_root = tfx_pipeline.pipeline_info.pipeline_root
if not isinstance(pipeline_root, str):
raise TypeError(
"TFX Pipeline root may not be a Placeholder, "
"but must be a specific string."
)
for component in tfx_pipeline.components:
# TODO(b/187122662): Pass through pip dependencies as a first-class
# component flag.
if isinstance(component, tfx_base_component.BaseComponent):
component._resolve_pip_dependencies(pipeline_root)
def _construct_pipeline() -> None:
"""Creates Kubeflow ContainerOps for each TFX component
encountered in the pipeline definition."""
self._construct_pipeline_graph(
pipeline, tfx_pipeline, stack, runtime_configuration
)
# Need to run this first to get self._params populated. Then KFP
# compiler can correctly match default value with PipelineParam.
self._parse_parameter_from_pipeline(tfx_pipeline)
# Create workflow spec and write out to package.
self._compiler._create_and_write_workflow(
# pylint: disable=protected-access
pipeline_func=_construct_pipeline,
pipeline_name=tfx_pipeline.pipeline_info.pipeline_name,
params_list=self._params,
package_path=self._output_path,
)
logger.info(
"Finished writing kubeflow pipeline definition file '%s'.",
self._output_path,
)
config: PipelineConfig
property
readonly
The config property
__init__(self, config, output_path, pod_labels_to_attach=None)
special
Initializes KubeflowDagRunner for compiling a Kubeflow Pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
KubeflowDagRunnerConfig |
A KubeflowDagRunnerConfig object to specify runtime configuration when running the pipeline under Kubeflow. |
required |
output_path |
str |
Path where the pipeline definition file will be stored. |
required |
pod_labels_to_attach |
Optional[Dict[str, str]] |
Optional set of pod labels to attach to GKE pod spun up for this pipeline. Default to the 3 labels: 1. add-pod-env: true, 2. pipeline SDK type, 3. pipeline unique ID, where 2 and 3 are instrumentation of usage tracking. |
None |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_dag_runner.py
def __init__(
self,
config: KubeflowDagRunnerConfig,
output_path: str,
pod_labels_to_attach: Optional[Dict[str, str]] = None,
):
"""Initializes KubeflowDagRunner for compiling a Kubeflow Pipeline.
Args:
config: A KubeflowDagRunnerConfig object to specify runtime
configuration when running the pipeline under Kubeflow.
output_path: Path where the pipeline definition file will be stored.
pod_labels_to_attach: Optional set of pod labels to attach to GKE pod
spun up for this pipeline. Default to the 3 labels:
1. add-pod-env: true,
2. pipeline SDK type,
3. pipeline unique ID,
where 2 and 3 are instrumentation of usage tracking.
"""
self._config = config or pipeline_config.PipelineConfig()
self._kubeflow_config = config
self._output_path = output_path
self._compiler = compiler.Compiler()
self._tfx_compiler = tfx_compiler.Compiler()
self._params: List[dsl.PipelineParam] = []
self._params_by_component_id: Dict[
str, List[data_types.RuntimeParameter]
] = collections.defaultdict(list)
self._deduped_parameter_names: Set[str] = set()
self._pod_labels_to_attach = (
pod_labels_to_attach or get_default_pod_labels()
)
run(self, pipeline, stack, runtime_configuration)
Compiles and outputs a Kubeflow Pipeline YAML definition file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
BasePipeline |
The logical TFX pipeline to use when building the Kubeflow pipeline. |
required |
stack |
Stack |
The ZenML stack that the pipeline is running on. |
required |
runtime_configuration |
RuntimeConfiguration |
The runtime configuration. |
required |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_dag_runner.py
def run(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Compiles and outputs a Kubeflow Pipeline YAML definition file.
Args:
pipeline: The logical TFX pipeline to use when building the Kubeflow
pipeline.
stack: The ZenML stack that the pipeline is running on.
runtime_configuration: The runtime configuration.
"""
tfx_pipeline = create_tfx_pipeline(pipeline, stack=stack)
pipeline_root = tfx_pipeline.pipeline_info.pipeline_root
if not isinstance(pipeline_root, str):
raise TypeError(
"TFX Pipeline root may not be a Placeholder, "
"but must be a specific string."
)
for component in tfx_pipeline.components:
# TODO(b/187122662): Pass through pip dependencies as a first-class
# component flag.
if isinstance(component, tfx_base_component.BaseComponent):
component._resolve_pip_dependencies(pipeline_root)
def _construct_pipeline() -> None:
"""Creates Kubeflow ContainerOps for each TFX component
encountered in the pipeline definition."""
self._construct_pipeline_graph(
pipeline, tfx_pipeline, stack, runtime_configuration
)
# Need to run this first to get self._params populated. Then KFP
# compiler can correctly match default value with PipelineParam.
self._parse_parameter_from_pipeline(tfx_pipeline)
# Create workflow spec and write out to package.
self._compiler._create_and_write_workflow(
# pylint: disable=protected-access
pipeline_func=_construct_pipeline,
pipeline_name=tfx_pipeline.pipeline_info.pipeline_name,
params_list=self._params,
package_path=self._output_path,
)
logger.info(
"Finished writing kubeflow pipeline definition file '%s'.",
self._output_path,
)
KubeflowDagRunnerConfig (PipelineConfig)
Runtime configuration parameters specific to execution on Kubeflow.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_dag_runner.py
class KubeflowDagRunnerConfig(pipeline_config.PipelineConfig):
"""Runtime configuration parameters specific to execution on Kubeflow."""
def __init__(
self,
image: str,
pipeline_operator_funcs: Optional[List[OpFunc]] = None,
supported_launcher_classes: Optional[
List[Type[base_component_launcher.BaseComponentLauncher]]
] = None,
**kwargs: Any
):
"""Creates a KubeflowDagRunnerConfig object.
The user can use pipeline_operator_funcs to apply modifications to
ContainerOps used in the pipeline. For example, to ensure the pipeline
steps mount a GCP secret, and a Persistent Volume, one can create config
object like so:
from kfp import gcp, onprem
mount_secret_op = gcp.use_secret('my-secret-name)
mount_volume_op = onprem.mount_pvc(
"my-persistent-volume-claim",
"my-volume-name",
"/mnt/volume-mount-path")
config = KubeflowDagRunnerConfig(
pipeline_operator_funcs=[mount_secret_op, mount_volume_op]
)
Args:
image: The docker image to use in the pipeline.
pipeline_operator_funcs: A list of ContainerOp modifying functions
that will be applied to every container step in the pipeline.
supported_launcher_classes: A list of component launcher classes that
are supported by the current pipeline. List sequence determines the
order in which launchers are chosen for each component being run.
**kwargs: keyword args for PipelineConfig.
"""
supported_launcher_classes = supported_launcher_classes or [
in_process_component_launcher.InProcessComponentLauncher,
kubernetes_component_launcher.KubernetesComponentLauncher,
]
super().__init__(
supported_launcher_classes=supported_launcher_classes, **kwargs
)
self.pipeline_operator_funcs = (
pipeline_operator_funcs or get_default_pipeline_operator_funcs()
)
self.image = image
__init__(self, image, pipeline_operator_funcs=None, supported_launcher_classes=None, **kwargs)
special
Creates a KubeflowDagRunnerConfig object. The user can use pipeline_operator_funcs to apply modifications to ContainerOps used in the pipeline. For example, to ensure the pipeline steps mount a GCP secret, and a Persistent Volume, one can create config object like so: from kfp import gcp, onprem mount_secret_op = gcp.use_secret('my-secret-name) mount_volume_op = onprem.mount_pvc( "my-persistent-volume-claim", "my-volume-name", "/mnt/volume-mount-path") config = KubeflowDagRunnerConfig( pipeline_operator_funcs=[mount_secret_op, mount_volume_op] )
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image |
str |
The docker image to use in the pipeline. |
required |
pipeline_operator_funcs |
Optional[List[Callable[[kfp.dsl._container_op.ContainerOp], Union[kfp.dsl._container_op.ContainerOp, NoneType]]]] |
A list of ContainerOp modifying functions that will be applied to every container step in the pipeline. |
None |
supported_launcher_classes |
Optional[List[Type[tfx.orchestration.launcher.base_component_launcher.BaseComponentLauncher]]] |
A list of component launcher classes that are supported by the current pipeline. List sequence determines the order in which launchers are chosen for each component being run. |
None |
**kwargs |
Any |
keyword args for PipelineConfig. |
{} |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_dag_runner.py
def __init__(
self,
image: str,
pipeline_operator_funcs: Optional[List[OpFunc]] = None,
supported_launcher_classes: Optional[
List[Type[base_component_launcher.BaseComponentLauncher]]
] = None,
**kwargs: Any
):
"""Creates a KubeflowDagRunnerConfig object.
The user can use pipeline_operator_funcs to apply modifications to
ContainerOps used in the pipeline. For example, to ensure the pipeline
steps mount a GCP secret, and a Persistent Volume, one can create config
object like so:
from kfp import gcp, onprem
mount_secret_op = gcp.use_secret('my-secret-name)
mount_volume_op = onprem.mount_pvc(
"my-persistent-volume-claim",
"my-volume-name",
"/mnt/volume-mount-path")
config = KubeflowDagRunnerConfig(
pipeline_operator_funcs=[mount_secret_op, mount_volume_op]
)
Args:
image: The docker image to use in the pipeline.
pipeline_operator_funcs: A list of ContainerOp modifying functions
that will be applied to every container step in the pipeline.
supported_launcher_classes: A list of component launcher classes that
are supported by the current pipeline. List sequence determines the
order in which launchers are chosen for each component being run.
**kwargs: keyword args for PipelineConfig.
"""
supported_launcher_classes = supported_launcher_classes or [
in_process_component_launcher.InProcessComponentLauncher,
kubernetes_component_launcher.KubernetesComponentLauncher,
]
super().__init__(
supported_launcher_classes=supported_launcher_classes, **kwargs
)
self.pipeline_operator_funcs = (
pipeline_operator_funcs or get_default_pipeline_operator_funcs()
)
self.image = image
get_default_pipeline_operator_funcs(use_gcp_sa=False)
Returns a default list of pipeline operator functions.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
use_gcp_sa |
bool |
If true, mount a GCP service account secret to each pod, with the name _KUBEFLOW_GCP_SECRET_NAME. |
False |
Returns:
Type | Description |
---|---|
List[Callable[[kfp.dsl._container_op.ContainerOp], Optional[kfp.dsl._container_op.ContainerOp]]] |
A list of functions with type OpFunc. |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_dag_runner.py
def get_default_pipeline_operator_funcs(
use_gcp_sa: bool = False,
) -> List[OpFunc]:
"""Returns a default list of pipeline operator functions.
Args:
use_gcp_sa: If true, mount a GCP service account secret to each pod, with
the name _KUBEFLOW_GCP_SECRET_NAME.
Returns:
A list of functions with type OpFunc.
"""
# Enables authentication for GCP services if needed.
gcp_secret_op = gcp.use_gcp_secret(_KUBEFLOW_GCP_SECRET_NAME)
# Mounts configmap containing Metadata gRPC server configuration.
mount_config_map_op = _mount_config_map_op("metadata-grpc-configmap")
if use_gcp_sa:
return [gcp_secret_op, mount_config_map_op]
else:
return [mount_config_map_op]
get_default_pod_labels()
Returns the default pod label dict for Kubeflow.
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_dag_runner.py
def get_default_pod_labels() -> Dict[str, str]:
"""Returns the default pod label dict for Kubeflow."""
# KFP default transformers add pod env:
# https://github.com/kubeflow/pipelines/blob/0.1.32/sdk/python/kfp/compiler/_default_transformers.py
result = {"add-pod-env": "true", telemetry_utils.LABEL_KFP_SDK_ENV: "tfx"}
return result
kubeflow_orchestrator
KubeflowOrchestrator (BaseOrchestrator)
pydantic-model
Orchestrator responsible for running pipelines using Kubeflow.
Attributes:
Name | Type | Description |
---|---|---|
custom_docker_base_image_name |
Optional[str] |
Name of a docker image that should be used as the base for the image that will be run on KFP pods. If no custom image is given, a basic image of the active ZenML version will be used. Note: This image needs to have ZenML installed, otherwise the pipeline execution will fail. For that reason, you might want to extend the ZenML docker images found here: https://hub.docker.com/r/zenmldocker/zenml/ |
kubeflow_pipelines_ui_port |
int |
A local port to which the KFP UI will be forwarded. |
kubernetes_context |
Optional[str] |
Optional name of a kubernetes context to run
pipelines in. If not set, the current active context will be used.
You can find the active context by running |
synchronous |
If |
Source code in zenml/integrations/kubeflow/orchestrators/kubeflow_orchestrator.py
class KubeflowOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines using Kubeflow.
Attributes:
custom_docker_base_image_name: Name of a docker image that should be
used as the base for the image that will be run on KFP pods. If no
custom image is given, a basic image of the active ZenML version
will be used. **Note**: This image needs to have ZenML installed,
otherwise the pipeline execution will fail. For that reason, you
might want to extend the ZenML docker images found here:
https://hub.docker.com/r/zenmldocker/zenml/
kubeflow_pipelines_ui_port: A local port to which the KFP UI will be
forwarded.
kubernetes_context: Optional name of a kubernetes context to run
pipelines in. If not set, the current active context will be used.
You can find the active context by running `kubectl config
current-context`.
synchronous: If `True`, running a pipeline using this orchestrator will
block until all steps finished running on KFP.
"""
custom_docker_base_image_name: Optional[str] = None
kubeflow_pipelines_ui_port: int = DEFAULT_KFP_UI_PORT
kubernetes_context: Optional[str] = None
synchronous = False
# Class Configuration
FLAVOR: ClassVar[str] = KUBEFLOW
@staticmethod
def _get_k3d_cluster_name(uuid: UUID) -> str:
"""Returns the k3d cluster name corresponding to the orchestrator
UUID."""
# k3d only allows cluster names with up to 32 characters; use the
# first 8 chars of the orchestrator UUID as identifier
return f"zenml-kubeflow-{str(uuid)[:8]}"
@staticmethod
def _get_k3d_kubernetes_context(uuid: UUID) -> str:
"""Returns the name of the kubernetes context associated with the k3d
cluster managed locally by ZenML corresponding to the orchestrator
UUID."""
return f"k3d-{KubeflowOrchestrator._get_k3d_cluster_name(uuid)}"
@root_validator
def set_default_kubernetes_context(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Pydantic root_validator that sets the default `kubernetes_context`
value to the value that is used to create the locally managed k3d
cluster, if not explicitly set.
Args:
values: Values passed to the object constructor
Returns:
Values passed to the Pydantic constructor
"""
if values.get("kubernetes_context"):
return values
# not likely, due to Pydantic validation, but mypy complains
assert "uuid" in values
values["kubernetes_context"] = cls._get_k3d_kubernetes_context(
values["uuid"]
)
return values
@property
def validator(self) -> Optional[StackValidator]:
"""Validates that the stack contains a container registry and that
requirements are met for local components."""
def _validate_local_requirements(stack: Stack) -> Tuple[bool, str]:
container_registry = stack.container_registry
# should not happen, because the stack validation takes care of
# this, but just in case
assert container_registry is not None
if not self.is_local:
# if the orchestrator is not running in a local k3d cluster,
# we cannot have any other local components in our stack, because
# we cannot mount the local path into the container. This
# may result in problems when running the pipeline, because
# the local components will not be available inside the
# Kubeflow containers.
# go through all stack components and identify those that
# advertise a local path where they persist information that
# they need to be available when running pipelines.
for stack_comp in stack.components.values():
local_path = stack_comp.local_path
if not local_path:
continue
return False, (
f"The Kubeflow orchestrator is not running in a local "
f"k3d cluster. The '{stack_comp.name}' "
f"{stack_comp.TYPE.value} is a local stack component "
f"and will not be available in the Kubeflow pipeline "
f"step. Please ensure that you always use non-local "
f"stack components with a remote Kubeflow orchestrator, "
f"otherwise you may run into pipeline execution "
f"problems."
)
# if the orchestrator is remote, the container registry must
# also be remote.
if container_registry.is_local:
return False, (
f"The Kubeflow orchestrator is not running in a local "
f"k3d cluster but the {container_registry.name} "
f"container registry URI '{container_registry.uri}' "
f"points to a local container registry. Please ensure "
f"that you always use non-local stack components with "
f"a remote Kubeflow orchestrator, otherwise you will "
f"run into problems."
)
else:
# if the orchestrator is local, the container registry must
# also be local.
if not container_registry.is_local:
return False, (
f"The container registry URI '{container_registry.uri}' "
f"doesn't match the expected format 'localhost:$PORT'. "
f"The local Kubeflow orchestrator only works with a "
f"local container registry because it cannot "
f"authenticate to external container registries."
)
return True, ""
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_validate_local_requirements,
)
def get_docker_image_name(self, pipeline_name: str) -> str:
"""Returns the full docker image name including registry and tag."""
base_image_name = f"zenml-kubeflow:{pipeline_name}"
container_registry = Repository().active_stack.container_registry
if container_registry:
registry_uri = container_registry.uri.rstrip("/")
return f"{registry_uri}/{base_image_name}"
else:
return base_image_name
@property
def is_local(self) -> bool:
"""Returns `True` if the KFP orchestrator is running locally (i.e. in
the local k3d cluster managed by ZenML).
"""
return self.kubernetes_context == self._get_k3d_kubernetes_context(
self.uuid
)
@property
def root_directory(self) -> str:
"""Returns path to the root directory for all files concerning
this orchestrator."""
return os.path.join(
zenml.io.utils.get_global_config_directory(),
"kubeflow",
str(self.uuid),
)
@property
def pipeline_directory(self) -> str:
"""Returns path to a directory in which the kubeflow pipeline files
are stored."""
return os.path.join(self.root_directory, "pipelines")
def prepare_pipeline_deployment(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> None:
"""Builds a docker image for the current environment and uploads it to
a container registry if configured.
"""
from zenml.utils.docker_utils import (
build_docker_image,
push_docker_image,
)
# if the orchestrator is not running in a local k3d cluster,
# we cannot mount the local path into the container. This
# may result in problems when running the pipeline, because
# the local components will not be available inside the
# Kubeflow containers.
if self.kubernetes_context:
# go through all stack components and identify those that advertise
# a local path where they persist information that they need to be
# available when running pipelines.
for stack_comp in stack.components.values():
local_path = stack_comp.local_path
if not local_path:
continue
logger.warning(
"The Kubeflow orchestrator is not running in a local k3d "
"cluster. The '%s' %s is a local stack component and will "
"not be available in the Kubeflow pipeline step. Please "
"ensure that you never combine non-local stack components "
"with a remote orchestrator, otherwise you may run into "
"pipeline execution problems.",
stack_comp.name,
stack_comp.TYPE.value,
)
image_name = self.get_docker_image_name(pipeline.name)
requirements = {*stack.requirements(), *pipeline.requirements}
logger.debug("Kubeflow docker container requirements: %s", requirements)
build_docker_image(
build_context_path=get_source_root_path(),
image_name=image_name,
dockerignore_path=pipeline.dockerignore_file,
requirements=requirements,
base_image=self.custom_docker_base_image_name,
environment_vars=self._get_environment_vars_from_secrets(
pipeline.secrets
),
)
if stack.container_registry:
push_docker_image(image_name)
def run_pipeline(
self,
pipeline: "BasePipeline",
stack: "Stack",
runtime_configuration: "RuntimeConfiguration",
) -> Any:
"""Runs a pipeline on Kubeflow Pipelines."""
# First check whether its running in a notebok
from zenml.environment import Environment
if Environment.in_notebook():
raise RuntimeError(
"The Kubeflow orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
from zenml.utils.docker_utils import get_image_digest
image_name = self.get_docker_image_name(pipeline.name)
image_name = get_image_digest(image_name) or image_name
fileio.makedirs(self.pipeline_directory)
pipeline_file_path = os.path.join(
self.pipeline_directory, f"{pipeline.name}.yaml"
)
runner_config = KubeflowDagRunnerConfig(image=image_name)
runner = KubeflowDagRunner(
config=runner_config, output_path=pipeline_file_path
)
runner.run(
pipeline=pipeline,
stack=stack,
runtime_configuration=runtime_configuration,
)
self._upload_and_run_pipeline(
pipeline_name=pipeline.name,
pipeline_file_path=pipeline_file_path,
runtime_configuration=runtime_configuration,
enable_cache=pipeline.enable_cache,
)
def _upload_and_run_pipeline(
self,
pipeline_name: str,
pipeline_file_path: str,
runtime_configuration: "RuntimeConfiguration",
enable_cache: bool,
) -> None:
"""Tries to upload and run a KFP pipeline.
Args:
pipeline_name: Name of the pipeline.
pipeline_file_path: Path to the pipeline definition file.
runtime_configuration: Runtime configuration of the pipeline run.
enable_cache: Whether caching is enabled for this pipeline run.
"""
try:
logger.info(
"Running in kubernetes context '%s'.",
self.kubernetes_context,
)
# upload the pipeline to Kubeflow and start it
client = kfp.Client(kube_context=self.kubernetes_context)
if runtime_configuration.schedule:
try:
experiment = client.get_experiment(pipeline_name)
logger.info(
"A recurring run has already been created with this "
"pipeline. Creating new recurring run now.."
)
except (ValueError, ApiException):
experiment = client.create_experiment(pipeline_name)
logger.info(
"Creating a new recurring run for pipeline '%s'.. ",
pipeline_name,
)
logger.info(
"You can see all recurring runs under the '%s' experiment.'",
pipeline_name,
)
schedule = runtime_configuration.schedule
result = client.create_recurring_run(
experiment_id=experiment.id,
job_name=runtime_configuration.run_name,
pipeline_package_path=pipeline_file_path,
enable_caching=enable_cache,
start_time=schedule.utc_start_time,
end_time=schedule.utc_end_time,
interval_second=schedule.interval_second,
no_catchup=not schedule.catchup,
)
logger.info("Started recurring run with ID '%s'.", result.id)
else:
logger.info(
"No schedule detected. Creating a one-off pipeline run.."
)
result = client.create_run_from_pipeline_package(
pipeline_file_path,
arguments={},
run_name=runtime_configuration.run_name,
enable_caching=enable_cache,
)
logger.info(
"Started one-off pipeline run with ID '%s'.", result.run_id
)
if self.synchronous:
# TODO [ENG-698]: Allow configuration of the timeout as a
# runtime option
client.wait_for_run_completion(
run_id=result.run_id, timeout=1200
)
except urllib3.exceptions.HTTPError as error:
logger.warning(
"Failed to upload Kubeflow pipeline: %s. "
"Please make sure your kube config is configured and the "
"current context is set correctly.",
error,
)
@property
def _pid_file_path(self) -> str:
"""Returns path to the daemon PID file."""
return os.path.