Airflow
zenml.integrations.airflow
special
Airflow integration for ZenML.
The Airflow integration sub-module powers an alternative to the local
orchestrator. You can enable it by registering the Airflow orchestrator with
the CLI tool, then bootstrap using the zenml orchestrator up
command.
AirflowIntegration (Integration)
Definition of Airflow Integration for ZenML.
Source code in zenml/integrations/airflow/__init__.py
class AirflowIntegration(Integration):
"""Definition of Airflow Integration for ZenML."""
NAME = AIRFLOW
REQUIREMENTS = ["apache-airflow~=2.4.0"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Airflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.airflow.flavors import AirflowOrchestratorFlavor
return [AirflowOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Airflow integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/airflow/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Airflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.airflow.flavors import AirflowOrchestratorFlavor
return [AirflowOrchestratorFlavor]
flavors
special
Airflow integration flavors.
airflow_orchestrator_flavor
Airflow orchestrator flavor.
AirflowOrchestratorConfig (BaseOrchestratorConfig, AirflowOrchestratorSettings)
pydantic-model
Configuration for the Airflow orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
local |
bool |
If the orchestrator is local or not. If this is True, will spin up a local Airflow server to run pipelines. |
Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseOrchestratorConfig, AirflowOrchestratorSettings
):
"""Configuration for the Airflow orchestrator.
Attributes:
local: If the orchestrator is local or not. If this is True, will spin
up a local Airflow server to run pipelines.
"""
local: bool = True
AirflowOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Airflow orchestrator.
Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Airflow orchestrator."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return AIRFLOW_ORCHESTRATOR_FLAVOR
@property
def config_class(self) -> Type[AirflowOrchestratorConfig]:
"""Returns `AirflowOrchestratorConfig` config class.
Returns:
The config class.
"""
return AirflowOrchestratorConfig
@property
def implementation_class(self) -> Type["AirflowOrchestrator"]:
"""Implementation class.
Returns:
The implementation class.
"""
from zenml.integrations.airflow.orchestrators import AirflowOrchestrator
return AirflowOrchestrator
config_class: Type[zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.AirflowOrchestratorConfig]
property
readonly
Returns AirflowOrchestratorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.AirflowOrchestratorConfig] |
The config class. |
implementation_class: Type[AirflowOrchestrator]
property
readonly
Implementation class.
Returns:
Type | Description |
---|---|
Type[AirflowOrchestrator] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
AirflowOrchestratorSettings (BaseSettings)
pydantic-model
Settings for the Airflow orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
dag_output_dir |
Optional[str] |
Output directory in which to write the Airflow DAG. |
dag_id |
Optional[str] |
Optional ID of the Airflow DAG to create. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step. |
dag_tags |
List[str] |
Tags to add to the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step. |
dag_args |
Dict[str, Any] |
Arguments for initializing the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step. |
operator |
str |
The operator to use for one or all steps. This can either be
a |
operator_args |
Dict[str, Any] |
Arguments for initializing the Airflow operator. |
custom_dag_generator |
Optional[str] |
Source string of a module to use for generating
Airflow DAGs. This module must contain the same classes and
constants as the
|
Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorSettings(BaseSettings):
"""Settings for the Airflow orchestrator.
Attributes:
dag_output_dir: Output directory in which to write the Airflow DAG.
dag_id: Optional ID of the Airflow DAG to create. This value is only
applied if the settings are defined on a ZenML pipeline and
ignored if defined on a step.
dag_tags: Tags to add to the Airflow DAG. This value is only
applied if the settings are defined on a ZenML pipeline and
ignored if defined on a step.
dag_args: Arguments for initializing the Airflow DAG. This
value is only applied if the settings are defined on a ZenML
pipeline and ignored if defined on a step.
operator: The operator to use for one or all steps. This can either be
a `zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.OperatorType`
or a string representing the source of the operator class to use
(e.g. `airflow.providers.docker.operators.docker.DockerOperator`)
operator_args: Arguments for initializing the Airflow
operator.
custom_dag_generator: Source string of a module to use for generating
Airflow DAGs. This module must contain the same classes and
constants as the
`zenml.integrations.airflow.orchestrators.dag_generator` module.
This value is only applied if the settings are defined on a ZenML
pipeline and ignored if defined on a step.
"""
dag_output_dir: Optional[str] = None
dag_id: Optional[str] = None
dag_tags: List[str] = []
dag_args: Dict[str, Any] = {}
operator: str = OperatorType.DOCKER.source
operator_args: Dict[str, Any] = {}
custom_dag_generator: Optional[str] = None
@validator("operator", always=True)
def _convert_operator(
cls, value: Optional[Union[str, OperatorType]]
) -> Optional[str]:
"""Converts operator types to source strings.
Args:
value: The operator type value.
Returns:
The operator source.
"""
if isinstance(value, OperatorType):
return value.source
try:
return OperatorType(value).source
except ValueError:
return value
OperatorType (Enum)
Airflow operator types.
Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class OperatorType(Enum):
"""Airflow operator types."""
DOCKER = "docker"
KUBERNETES_POD = "kubernetes_pod"
GKE_START_POD = "gke_start_pod"
@property
def source(self) -> str:
"""Operator source.
Returns:
The operator source.
"""
return {
OperatorType.DOCKER: "airflow.providers.docker.operators.docker.DockerOperator",
OperatorType.KUBERNETES_POD: "airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator",
OperatorType.GKE_START_POD: "airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator",
}[self]
orchestrators
special
The Airflow integration enables the use of Airflow as a pipeline orchestrator.
airflow_orchestrator
Implementation of Airflow orchestrator integration.
AirflowOrchestrator (BaseOrchestrator)
Orchestrator responsible for running pipelines using Airflow.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class AirflowOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines using Airflow."""
def __init__(self, **values: Any):
"""Sets environment variables to configure airflow.
Args:
**values: Values to set in the orchestrator.
"""
super().__init__(**values)
self.airflow_home = os.path.join(
io_utils.get_global_config_directory(),
"airflow",
str(self.id),
)
self._set_env()
@property
def config(self) -> AirflowOrchestratorConfig:
"""Returns the orchestrator config.
Returns:
The configuration.
"""
return cast(AirflowOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Kubeflow orchestrator.
Returns:
The settings class.
"""
return AirflowOrchestratorSettings
@property
def dags_directory(self) -> str:
"""Returns path to the airflow dags directory.
Returns:
Path to the airflow dags directory.
"""
return os.path.join(self.airflow_home, "dags")
def _set_env(self) -> None:
"""Sets environment variables to configure airflow."""
os.environ["AIRFLOW_HOME"] = self.airflow_home
if self.config.local:
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = self.dags_directory
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "false"
# check the DAG folder every 10 seconds for new files
os.environ["AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL"] = "10"
@property
def validator(self) -> Optional["StackValidator"]:
"""Validates the stack.
In the remote case, checks that the stack contains a container registry
and only remote components.
Returns:
A `StackValidator` instance.
"""
if self.config.local:
# No container registry required if just running locally.
return None
else:
def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]:
for component in stack.components.values():
if not component.config.is_local:
continue
return False, (
f"The Airflow orchestrator is configured to run "
f"pipelines remotely, but the '{component.name}' "
f"{component.type.value} is a local stack component "
f"and will not be available in the Airflow "
f"task.\nPlease ensure that you always use non-local "
f"stack components with a remote Airflow orchestrator, "
f"otherwise you may run into pipeline execution "
f"problems."
)
return True, ""
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_validate_remote_components,
)
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Builds a Docker image to run pipeline steps.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
if self.config.local:
stack.check_local_paths()
docker_image_builder = PipelineDockerImageBuilder()
if stack.container_registry:
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
else:
# If there is no container registry, we only build the image
target_image_name = docker_image_builder.get_target_image_name(
deployment=deployment
)
docker_image_builder.build_docker_image(
target_image_name=target_image_name,
deployment=deployment,
stack=stack,
)
deployment.add_extra(
ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
)
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Creates and writes an Airflow DAG zip file.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
"""
pipeline_settings = cast(
AirflowOrchestratorSettings, self.get_settings(deployment)
)
dag_generator_values = get_dag_generator_values(
custom_dag_generator_source=pipeline_settings.custom_dag_generator
)
command = StepEntrypointConfiguration.get_entrypoint_command()
tasks = []
for step_name, step in deployment.steps.items():
settings = cast(
AirflowOrchestratorSettings, self.get_settings(step)
)
arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name
)
task = dag_generator_values.task_configuration_class(
id=step_name,
zenml_step_name=step.config.name,
upstream_steps=step.spec.upstream_steps,
command=command,
arguments=arguments,
operator_source=settings.operator,
operator_args=settings.operator_args,
)
tasks.append(task)
local_stores_path = (
os.path.expanduser(GlobalConfiguration().local_stores_path)
if self.config.local
else None
)
docker_image = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
pipeline_name=deployment.pipeline.name
)
dag_config = dag_generator_values.dag_configuration_class(
id=dag_id,
docker_image=docker_image,
local_stores_path=local_stores_path,
tasks=tasks,
tags=pipeline_settings.dag_tags,
dag_args=pipeline_settings.dag_args,
**self._translate_schedule(deployment.schedule),
)
self._write_dag(
dag_config,
dag_generator_values=dag_generator_values,
output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
)
def _write_dag(
self,
dag_config: "DagConfiguration",
dag_generator_values: DagGeneratorValues,
output_dir: str,
) -> None:
"""Writes an Airflow DAG to disk.
Args:
dag_config: Configuration of the DAG to write.
dag_generator_values: Values of the DAG generator to use.
output_dir: The directory in which to write the DAG.
"""
io_utils.create_dir_recursive_if_not_exists(output_dir)
if self.config.local and output_dir != self.dags_directory:
logger.warning(
"You're using a local Airflow orchestrator but specified a "
"custom DAG output directory `%s`. This DAG will not be found "
"by the local Airflow server until you copy it in the DAGs "
"directory `%s`.",
output_dir,
self.dags_directory,
)
def _write_zip(path: str) -> None:
with zipfile.ZipFile(path, mode="w") as z:
z.write(dag_generator_values.file, arcname="dag.py")
z.writestr(
dag_generator_values.config_file_name, dag_config.json()
)
logger.info("Writing DAG definition to `%s`.", path)
dag_filename = f"{dag_config.id}.zip"
if io_utils.is_remote(output_dir):
io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
local_zip_path = os.path.join(self.dags_directory, dag_filename)
remote_zip_path = os.path.join(output_dir, dag_filename)
_write_zip(local_zip_path)
try:
fileio.copy(local_zip_path, remote_zip_path)
logger.info("Copied DAG definition to `%s`.", remote_zip_path)
except Exception as e:
logger.exception(e)
logger.error(
"Failed to upload DAG to remote path `%s`. To run the "
"pipeline in Airflow, please manually copy the file `%s` "
"to your Airflow DAG directory.",
remote_zip_path,
local_zip_path,
)
else:
zip_path = os.path.join(output_dir, dag_filename)
_write_zip(zip_path)
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
from zenml.integrations.airflow.orchestrators.dag_generator import (
ENV_ZENML_AIRFLOW_RUN_ID,
)
try:
return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_AIRFLOW_RUN_ID}."
)
@staticmethod
def _translate_schedule(
schedule: Optional["Schedule"] = None,
) -> Dict[str, Any]:
"""Convert ZenML schedule into Airflow schedule.
The Airflow schedule uses slightly different naming and needs some
default entries for execution without a schedule.
Args:
schedule: Containing the interval, start and end date and
a boolean flag that defines if past runs should be caught up
on
Returns:
Airflow configuration dict.
"""
if schedule:
if schedule.cron_expression:
start_time = schedule.start_time or (
datetime.datetime.now() - datetime.timedelta(7)
)
return {
"schedule": schedule.cron_expression,
"start_date": start_time,
"end_date": schedule.end_time,
"catchup": schedule.catchup,
}
else:
return {
"schedule": schedule.interval_second,
"start_date": schedule.start_time,
"end_date": schedule.end_time,
"catchup": schedule.catchup,
}
return {
"schedule": "@once",
# set the a start time in the past and disable catchup so airflow
# runs the dag immediately
"start_date": datetime.datetime.now() - datetime.timedelta(7),
"catchup": False,
}
#####################
# Local Airflow #
#####################
@property
def pid_file(self) -> str:
"""Returns path to the daemon PID file.
Returns:
Path to the daemon PID file.
"""
return os.path.join(self.airflow_home, "airflow_daemon.pid")
@property
def log_file(self) -> str:
"""Returns path to the airflow log file.
Returns:
str: Path to the airflow log file.
"""
return os.path.join(self.airflow_home, "airflow_orchestrator.log")
@property
def password_file(self) -> str:
"""Returns path to the webserver password file.
Returns:
Path to the webserver password file.
"""
return os.path.join(self.airflow_home, "standalone_admin_password.txt")
@property
def is_running(self) -> bool:
"""Returns whether the orchestrator is "running".
In the non-local case, this is always True. Otherwise checks if the
local Airflow server is running.
Returns:
If the orchestrator is running.
Raises:
RuntimeError: If port 8080 is occupied.
"""
if not self.config.local:
return True
from airflow.cli.commands.standalone_command import (
StandaloneCommand, # type: ignore
)
from airflow.jobs.triggerer_job import TriggererJob
daemon_running = daemon.check_if_daemon_is_running(self.pid_file)
command = StandaloneCommand()
webserver_port_open = command.port_open(8080)
if not daemon_running:
if webserver_port_open:
raise RuntimeError(
"The airflow daemon does not seem to be running but "
"local port 8080 is occupied. Make sure the port is "
"available and try again."
)
# exit early so we don't check non-existing airflow databases
return False
# we can't use StandaloneCommand().is_ready() here as the
# Airflow SequentialExecutor apparently does not send a heartbeat
# while running a task which would result in this returning `False`
# even if Airflow is running.
airflow_running = webserver_port_open and command.job_running(
TriggererJob
)
return airflow_running
@property
def is_provisioned(self) -> bool:
"""Returns whether the airflow daemon is currently running.
Returns:
True if the airflow daemon is running, False otherwise.
"""
return self.is_running
def provision(self) -> None:
"""Ensures that Airflow is running."""
if not self.config.local:
return
if self.is_running:
logger.info("Airflow is already running.")
self._log_webserver_credentials()
return
self._check_local_server_requirements()
if not fileio.exists(self.dags_directory):
io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
from airflow.cli.commands.standalone_command import StandaloneCommand
try:
command = StandaloneCommand()
daemon.run_as_daemon(
command.run,
pid_file=self.pid_file,
log_file=self.log_file,
)
while not self.is_running:
# Wait until the daemon started all the relevant airflow
# processes
time.sleep(0.1)
self._log_webserver_credentials()
except Exception as e:
logger.error(e)
logger.error(
"An error occurred while starting the Airflow daemon. If you "
"want to start it manually, use the commands described in the "
"official Airflow quickstart guide for running Airflow locally."
)
self.deprovision()
def deprovision(self) -> None:
"""Stops the airflow daemon if necessary and tears down resources."""
if not self.config.local:
return
if self.is_running:
daemon.stop_daemon(self.pid_file)
fileio.rmtree(self.airflow_home)
logger.info("Airflow spun down.")
@staticmethod
def _check_local_server_requirements() -> None:
"""Checks that all packages for a local Airflow server are installed.
When running a local Airflow server, we require the
`apache-airflow-providers-docker` to run steps locally in Docker
containers in addition to the basic integration requirements.
Raises:
RuntimeError: If the `apache-airflow-providers-docker` is not
installed in the active Python environment.
"""
try:
from airflow.providers.docker.operators.docker import ( # noqa
DockerOperator,
)
except ImportError:
raise RuntimeError(
"Unable to import Airflow `DockerOperator` in the active "
"Python environment. Spinning up a local Airflow server to "
"run ZenML pipelines requires the `DockerOperator` to be "
"available. Please run "
"`pip install apache-airflow-providers-docker` to install it "
"and try again."
)
def _log_webserver_credentials(self) -> None:
"""Logs URL and credentials to log in to the airflow webserver.
Raises:
FileNotFoundError: If the password file does not exist.
"""
if fileio.exists(self.password_file):
with open(self.password_file) as file:
password = file.read().strip()
else:
raise FileNotFoundError(
f"Can't find password file '{self.password_file}'"
)
logger.info(
"To inspect your DAGs, login to `http://localhost:8080` "
"with username: `admin` password: `%s`",
password,
)
config: AirflowOrchestratorConfig
property
readonly
Returns the orchestrator config.
Returns:
Type | Description |
---|---|
AirflowOrchestratorConfig |
The configuration. |
dags_directory: str
property
readonly
Returns path to the airflow dags directory.
Returns:
Type | Description |
---|---|
str |
Path to the airflow dags directory. |
is_provisioned: bool
property
readonly
Returns whether the airflow daemon is currently running.
Returns:
Type | Description |
---|---|
bool |
True if the airflow daemon is running, False otherwise. |
is_running: bool
property
readonly
Returns whether the orchestrator is "running".
In the non-local case, this is always True. Otherwise checks if the local Airflow server is running.
Returns:
Type | Description |
---|---|
bool |
If the orchestrator is running. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If port 8080 is occupied. |
log_file: str
property
readonly
Returns path to the airflow log file.
Returns:
Type | Description |
---|---|
str |
Path to the airflow log file. |
password_file: str
property
readonly
Returns path to the webserver password file.
Returns:
Type | Description |
---|---|
str |
Path to the webserver password file. |
pid_file: str
property
readonly
Returns path to the daemon PID file.
Returns:
Type | Description |
---|---|
str |
Path to the daemon PID file. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Kubeflow orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[StackValidator]
property
readonly
Validates the stack.
In the remote case, checks that the stack contains a container registry and only remote components.
Returns:
Type | Description |
---|---|
Optional[StackValidator] |
A |
__init__(self, **values)
special
Sets environment variables to configure airflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**values |
Any |
Values to set in the orchestrator. |
{} |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __init__(self, **values: Any):
"""Sets environment variables to configure airflow.
Args:
**values: Values to set in the orchestrator.
"""
super().__init__(**values)
self.airflow_home = os.path.join(
io_utils.get_global_config_directory(),
"airflow",
str(self.id),
)
self._set_env()
deprovision(self)
Stops the airflow daemon if necessary and tears down resources.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def deprovision(self) -> None:
"""Stops the airflow daemon if necessary and tears down resources."""
if not self.config.local:
return
if self.is_running:
daemon.stop_daemon(self.pid_file)
fileio.rmtree(self.airflow_home)
logger.info("Airflow spun down.")
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
from zenml.integrations.airflow.orchestrators.dag_generator import (
ENV_ZENML_AIRFLOW_RUN_ID,
)
try:
return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_AIRFLOW_RUN_ID}."
)
prepare_or_run_pipeline(self, deployment, stack)
Creates and writes an Airflow DAG zip file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Creates and writes an Airflow DAG zip file.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
"""
pipeline_settings = cast(
AirflowOrchestratorSettings, self.get_settings(deployment)
)
dag_generator_values = get_dag_generator_values(
custom_dag_generator_source=pipeline_settings.custom_dag_generator
)
command = StepEntrypointConfiguration.get_entrypoint_command()
tasks = []
for step_name, step in deployment.steps.items():
settings = cast(
AirflowOrchestratorSettings, self.get_settings(step)
)
arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name
)
task = dag_generator_values.task_configuration_class(
id=step_name,
zenml_step_name=step.config.name,
upstream_steps=step.spec.upstream_steps,
command=command,
arguments=arguments,
operator_source=settings.operator,
operator_args=settings.operator_args,
)
tasks.append(task)
local_stores_path = (
os.path.expanduser(GlobalConfiguration().local_stores_path)
if self.config.local
else None
)
docker_image = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
pipeline_name=deployment.pipeline.name
)
dag_config = dag_generator_values.dag_configuration_class(
id=dag_id,
docker_image=docker_image,
local_stores_path=local_stores_path,
tasks=tasks,
tags=pipeline_settings.dag_tags,
dag_args=pipeline_settings.dag_args,
**self._translate_schedule(deployment.schedule),
)
self._write_dag(
dag_config,
dag_generator_values=dag_generator_values,
output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
)
prepare_pipeline_deployment(self, deployment, stack)
Builds a Docker image to run pipeline steps.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Builds a Docker image to run pipeline steps.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
if self.config.local:
stack.check_local_paths()
docker_image_builder = PipelineDockerImageBuilder()
if stack.container_registry:
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
else:
# If there is no container registry, we only build the image
target_image_name = docker_image_builder.get_target_image_name(
deployment=deployment
)
docker_image_builder.build_docker_image(
target_image_name=target_image_name,
deployment=deployment,
stack=stack,
)
deployment.add_extra(
ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
)
provision(self)
Ensures that Airflow is running.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def provision(self) -> None:
"""Ensures that Airflow is running."""
if not self.config.local:
return
if self.is_running:
logger.info("Airflow is already running.")
self._log_webserver_credentials()
return
self._check_local_server_requirements()
if not fileio.exists(self.dags_directory):
io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
from airflow.cli.commands.standalone_command import StandaloneCommand
try:
command = StandaloneCommand()
daemon.run_as_daemon(
command.run,
pid_file=self.pid_file,
log_file=self.log_file,
)
while not self.is_running:
# Wait until the daemon started all the relevant airflow
# processes
time.sleep(0.1)
self._log_webserver_credentials()
except Exception as e:
logger.error(e)
logger.error(
"An error occurred while starting the Airflow daemon. If you "
"want to start it manually, use the commands described in the "
"official Airflow quickstart guide for running Airflow locally."
)
self.deprovision()
DagGeneratorValues (tuple)
Values from the DAG generator module.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class DagGeneratorValues(NamedTuple):
"""Values from the DAG generator module."""
file: str
config_file_name: str
run_id_env_variable_name: str
dag_configuration_class: Type["DagConfiguration"]
task_configuration_class: Type["TaskConfiguration"]
__getnewargs__(self)
special
Return self as a plain tuple. Used by copy and pickle.
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __getnewargs__(self):
'Return self as a plain tuple. Used by copy and pickle.'
return _tuple(self)
__new__(_cls, file, config_file_name, run_id_env_variable_name, dag_configuration_class, task_configuration_class)
special
staticmethod
Create new instance of DagGeneratorValues(file, config_file_name, run_id_env_variable_name, dag_configuration_class, task_configuration_class)
__repr__(self)
special
Return a nicely formatted representation string
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __repr__(self):
'Return a nicely formatted representation string'
return self.__class__.__name__ + repr_fmt % self
get_dag_generator_values(custom_dag_generator_source=None)
Gets values from the DAG generator module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
custom_dag_generator_source |
Optional[str] |
Source of a custom DAG generator module. |
None |
Returns:
Type | Description |
---|---|
DagGeneratorValues |
DAG generator module values. |
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def get_dag_generator_values(
custom_dag_generator_source: Optional[str] = None,
) -> DagGeneratorValues:
"""Gets values from the DAG generator module.
Args:
custom_dag_generator_source: Source of a custom DAG generator module.
Returns:
DAG generator module values.
"""
if custom_dag_generator_source:
module = importlib.import_module(custom_dag_generator_source)
else:
from zenml.integrations.airflow.orchestrators import dag_generator
module = dag_generator
assert module.__file__
return DagGeneratorValues(
file=module.__file__,
config_file_name=module.CONFIG_FILENAME,
run_id_env_variable_name=module.ENV_ZENML_AIRFLOW_RUN_ID,
dag_configuration_class=module.DagConfiguration,
task_configuration_class=module.TaskConfiguration,
)
dag_generator
Module to generate an Airflow DAG from a config file.
DagConfiguration (BaseModel)
pydantic-model
Airflow DAG configuration.
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
class DagConfiguration(BaseModel):
"""Airflow DAG configuration."""
id: str
docker_image: str
tasks: List[TaskConfiguration]
local_stores_path: Optional[str] = None
schedule: Union[datetime.timedelta, str]
start_date: datetime.datetime
end_date: Optional[datetime.datetime] = None
catchup: bool = False
tags: List[str] = []
dag_args: Dict[str, Any] = {}
TaskConfiguration (BaseModel)
pydantic-model
Airflow task configuration.
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
class TaskConfiguration(BaseModel):
"""Airflow task configuration."""
id: str
zenml_step_name: str
upstream_steps: List[str]
command: List[str]
arguments: List[str]
operator_source: str
operator_args: Dict[str, Any] = {}
get_docker_operator_init_kwargs(dag_config, task_config)
Gets keyword arguments to pass to the DockerOperator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag_config |
DagConfiguration |
The configuration of the DAG. |
required |
task_config |
TaskConfiguration |
The configuration of the task. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The init keyword arguments. |
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_docker_operator_init_kwargs(
dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
"""Gets keyword arguments to pass to the DockerOperator.
Args:
dag_config: The configuration of the DAG.
task_config: The configuration of the task.
Returns:
The init keyword arguments.
"""
mounts = []
extra_hosts = {}
environment = {ENV_ZENML_AIRFLOW_RUN_ID: "{{run_id}}"}
if dag_config.local_stores_path:
from docker.types import Mount
environment[ENV_ZENML_LOCAL_STORES_PATH] = dag_config.local_stores_path
mounts = [
Mount(
target=dag_config.local_stores_path,
source=dag_config.local_stores_path,
type="bind",
)
]
extra_hosts = {"host.docker.internal": "host-gateway"}
return {
"image": dag_config.docker_image,
"command": task_config.command + task_config.arguments,
"mounts": mounts,
"environment": environment,
"extra_hosts": extra_hosts,
}
get_kubernetes_pod_operator_init_kwargs(dag_config, task_config)
Gets keyword arguments to pass to the KubernetesPodOperator.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag_config |
DagConfiguration |
The configuration of the DAG. |
required |
task_config |
TaskConfiguration |
The configuration of the task. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The init keyword arguments. |
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_kubernetes_pod_operator_init_kwargs(
dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
"""Gets keyword arguments to pass to the KubernetesPodOperator.
Args:
dag_config: The configuration of the DAG.
task_config: The configuration of the task.
Returns:
The init keyword arguments.
"""
from kubernetes.client.models import V1EnvVar
return {
"name": f"{dag_config.id}_{task_config.id}",
"namespace": "default",
"image": dag_config.docker_image,
"cmds": task_config.command,
"arguments": task_config.arguments,
"env_vars": [
V1EnvVar(name=ENV_ZENML_AIRFLOW_RUN_ID, value="{{run_id}}")
],
}
get_operator_init_kwargs(operator_class, dag_config, task_config)
Gets keyword arguments to pass to the operator init method.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
operator_class |
Type[Any] |
The operator class for which to get the kwargs. |
required |
dag_config |
DagConfiguration |
The configuration of the DAG. |
required |
task_config |
TaskConfiguration |
The configuration of the task. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The init keyword arguments. |
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_operator_init_kwargs(
operator_class: Type[Any],
dag_config: DagConfiguration,
task_config: TaskConfiguration,
) -> Dict[str, Any]:
"""Gets keyword arguments to pass to the operator init method.
Args:
operator_class: The operator class for which to get the kwargs.
dag_config: The configuration of the DAG.
task_config: The configuration of the task.
Returns:
The init keyword arguments.
"""
init_kwargs = {"task_id": task_config.id}
try:
from airflow.providers.docker.operators.docker import DockerOperator
if issubclass(operator_class, DockerOperator):
init_kwargs.update(
get_docker_operator_init_kwargs(
dag_config=dag_config, task_config=task_config
)
)
except ImportError:
pass
try:
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
KubernetesPodOperator,
)
if issubclass(operator_class, KubernetesPodOperator):
init_kwargs.update(
get_kubernetes_pod_operator_init_kwargs(
dag_config=dag_config, task_config=task_config
)
)
except ImportError:
pass
init_kwargs.update(task_config.operator_args)
return init_kwargs
import_class_by_path(class_path)
Imports a class based on a given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_path |
str |
str, class_source e.g. this.module.Class |
required |
Returns:
Type | Description |
---|---|
Type[Any] |
the given class |
Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def import_class_by_path(class_path: str) -> Type[Any]:
"""Imports a class based on a given path.
Args:
class_path: str, class_source e.g. this.module.Class
Returns:
the given class
"""
module_name, class_name = class_path.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name) # type: ignore[no-any-return]