Skip to content

Airflow

zenml.integrations.airflow special

Airflow integration for ZenML.

The Airflow integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Airflow orchestrator with the CLI tool, then bootstrap using the zenml orchestrator up command.

AirflowIntegration (Integration)

Definition of Airflow Integration for ZenML.

Source code in zenml/integrations/airflow/__init__.py
class AirflowIntegration(Integration):
    """Definition of Airflow Integration for ZenML."""

    NAME = AIRFLOW
    REQUIREMENTS = ["apache-airflow~=2.4.0"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Airflow integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.airflow.flavors import AirflowOrchestratorFlavor

        return [AirflowOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Airflow integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/airflow/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Airflow integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.airflow.flavors import AirflowOrchestratorFlavor

    return [AirflowOrchestratorFlavor]

flavors special

Airflow integration flavors.

airflow_orchestrator_flavor

Airflow orchestrator flavor.

AirflowOrchestratorConfig (BaseOrchestratorConfig, AirflowOrchestratorSettings) pydantic-model

Configuration for the Airflow orchestrator.

Attributes:

Name Type Description
local bool

If the orchestrator is local or not. If this is True, will spin up a local Airflow server to run pipelines.

Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseOrchestratorConfig, AirflowOrchestratorSettings
):
    """Configuration for the Airflow orchestrator.

    Attributes:
        local: If the orchestrator is local or not. If this is True, will spin
            up a local Airflow server to run pipelines.
    """

    local: bool = True
AirflowOrchestratorFlavor (BaseOrchestratorFlavor)

Flavor for the Airflow orchestrator.

Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for the Airflow orchestrator."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return AIRFLOW_ORCHESTRATOR_FLAVOR

    @property
    def config_class(self) -> Type[AirflowOrchestratorConfig]:
        """Returns `AirflowOrchestratorConfig` config class.

        Returns:
            The config class.
        """
        return AirflowOrchestratorConfig

    @property
    def implementation_class(self) -> Type["AirflowOrchestrator"]:
        """Implementation class.

        Returns:
            The implementation class.
        """
        from zenml.integrations.airflow.orchestrators import AirflowOrchestrator

        return AirflowOrchestrator
config_class: Type[zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.AirflowOrchestratorConfig] property readonly

Returns AirflowOrchestratorConfig config class.

Returns:

Type Description
Type[zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.AirflowOrchestratorConfig]

The config class.

implementation_class: Type[AirflowOrchestrator] property readonly

Implementation class.

Returns:

Type Description
Type[AirflowOrchestrator]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

AirflowOrchestratorSettings (BaseSettings) pydantic-model

Settings for the Airflow orchestrator.

Attributes:

Name Type Description
dag_output_dir Optional[str]

Output directory in which to write the Airflow DAG.

dag_id Optional[str]

Optional ID of the Airflow DAG to create. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

dag_tags List[str]

Tags to add to the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

dag_args Dict[str, Any]

Arguments for initializing the Airflow DAG. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

operator str

The operator to use for one or all steps. This can either be a zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.OperatorType or a string representing the source of the operator class to use (e.g. airflow.providers.docker.operators.docker.DockerOperator)

operator_args Dict[str, Any]

Arguments for initializing the Airflow operator.

custom_dag_generator Optional[str]

Source string of a module to use for generating Airflow DAGs. This module must contain the same classes and constants as the zenml.integrations.airflow.orchestrators.dag_generator module. This value is only applied if the settings are defined on a ZenML pipeline and ignored if defined on a step.

Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class AirflowOrchestratorSettings(BaseSettings):
    """Settings for the Airflow orchestrator.

    Attributes:
        dag_output_dir: Output directory in which to write the Airflow DAG.
        dag_id: Optional ID of the Airflow DAG to create. This value is only
            applied if the settings are defined on a ZenML pipeline and
            ignored if defined on a step.
        dag_tags: Tags to add to the Airflow DAG. This value is only
            applied if the settings are defined on a ZenML pipeline and
            ignored if defined on a step.
        dag_args: Arguments for initializing the Airflow DAG. This
            value is only applied if the settings are defined on a ZenML
            pipeline and ignored if defined on a step.
        operator: The operator to use for one or all steps. This can either be
            a `zenml.integrations.airflow.flavors.airflow_orchestrator_flavor.OperatorType`
            or a string representing the source of the operator class to use
            (e.g. `airflow.providers.docker.operators.docker.DockerOperator`)
        operator_args: Arguments for initializing the Airflow
            operator.
        custom_dag_generator: Source string of a module to use for generating
            Airflow DAGs. This module must contain the same classes and
            constants as the
            `zenml.integrations.airflow.orchestrators.dag_generator` module.
            This value is only applied if the settings are defined on a ZenML
            pipeline and ignored if defined on a step.
    """

    dag_output_dir: Optional[str] = None

    dag_id: Optional[str] = None
    dag_tags: List[str] = []
    dag_args: Dict[str, Any] = {}

    operator: str = OperatorType.DOCKER.source
    operator_args: Dict[str, Any] = {}

    custom_dag_generator: Optional[str] = None

    @validator("operator", always=True)
    def _convert_operator(
        cls, value: Optional[Union[str, OperatorType]]
    ) -> Optional[str]:
        """Converts operator types to source strings.

        Args:
            value: The operator type value.

        Returns:
            The operator source.
        """
        if isinstance(value, OperatorType):
            return value.source

        try:
            return OperatorType(value).source
        except ValueError:
            return value
OperatorType (Enum)

Airflow operator types.

Source code in zenml/integrations/airflow/flavors/airflow_orchestrator_flavor.py
class OperatorType(Enum):
    """Airflow operator types."""

    DOCKER = "docker"
    KUBERNETES_POD = "kubernetes_pod"
    GKE_START_POD = "gke_start_pod"

    @property
    def source(self) -> str:
        """Operator source.

        Returns:
            The operator source.
        """
        return {
            OperatorType.DOCKER: "airflow.providers.docker.operators.docker.DockerOperator",
            OperatorType.KUBERNETES_POD: "airflow.providers.cncf.kubernetes.operators.kubernetes_pod.KubernetesPodOperator",
            OperatorType.GKE_START_POD: "airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator",
        }[self]

orchestrators special

The Airflow integration enables the use of Airflow as a pipeline orchestrator.

airflow_orchestrator

Implementation of Airflow orchestrator integration.

AirflowOrchestrator (BaseOrchestrator)

Orchestrator responsible for running pipelines using Airflow.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class AirflowOrchestrator(BaseOrchestrator):
    """Orchestrator responsible for running pipelines using Airflow."""

    def __init__(self, **values: Any):
        """Sets environment variables to configure airflow.

        Args:
            **values: Values to set in the orchestrator.
        """
        super().__init__(**values)
        self.airflow_home = os.path.join(
            io_utils.get_global_config_directory(),
            "airflow",
            str(self.id),
        )
        self._set_env()

    @property
    def config(self) -> AirflowOrchestratorConfig:
        """Returns the orchestrator config.

        Returns:
            The configuration.
        """
        return cast(AirflowOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Kubeflow orchestrator.

        Returns:
            The settings class.
        """
        return AirflowOrchestratorSettings

    @property
    def dags_directory(self) -> str:
        """Returns path to the airflow dags directory.

        Returns:
            Path to the airflow dags directory.
        """
        return os.path.join(self.airflow_home, "dags")

    def _set_env(self) -> None:
        """Sets environment variables to configure airflow."""
        os.environ["AIRFLOW_HOME"] = self.airflow_home

        if self.config.local:
            os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = self.dags_directory
            os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "false"
            # check the DAG folder every 10 seconds for new files
            os.environ["AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL"] = "10"

    @property
    def validator(self) -> Optional["StackValidator"]:
        """Validates the stack.

        In the remote case, checks that the stack contains a container registry
        and only remote components.

        Returns:
            A `StackValidator` instance.
        """
        if self.config.local:
            # No container registry required if just running locally.
            return None
        else:

            def _validate_remote_components(stack: "Stack") -> Tuple[bool, str]:
                for component in stack.components.values():
                    if not component.config.is_local:
                        continue

                    return False, (
                        f"The Airflow orchestrator is configured to run "
                        f"pipelines remotely, but the '{component.name}' "
                        f"{component.type.value} is a local stack component "
                        f"and will not be available in the Airflow "
                        f"task.\nPlease ensure that you always use non-local "
                        f"stack components with a remote Airflow orchestrator, "
                        f"otherwise you may run into pipeline execution "
                        f"problems."
                    )

                return True, ""

            return StackValidator(
                required_components={StackComponentType.CONTAINER_REGISTRY},
                custom_validation_function=_validate_remote_components,
            )

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Builds a Docker image to run pipeline steps.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """
        if self.config.local:
            stack.check_local_paths()

        docker_image_builder = PipelineDockerImageBuilder()
        if stack.container_registry:
            repo_digest = docker_image_builder.build_and_push_docker_image(
                deployment=deployment, stack=stack
            )
            deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
        else:
            # If there is no container registry, we only build the image
            target_image_name = docker_image_builder.get_target_image_name(
                deployment=deployment
            )
            docker_image_builder.build_docker_image(
                target_image_name=target_image_name,
                deployment=deployment,
                stack=stack,
            )
            deployment.add_extra(
                ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
            )

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> Any:
        """Creates and writes an Airflow DAG zip file.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.
        """
        pipeline_settings = cast(
            AirflowOrchestratorSettings, self.get_settings(deployment)
        )

        dag_generator_values = get_dag_generator_values(
            custom_dag_generator_source=pipeline_settings.custom_dag_generator
        )

        command = StepEntrypointConfiguration.get_entrypoint_command()

        tasks = []
        for step_name, step in deployment.steps.items():
            settings = cast(
                AirflowOrchestratorSettings, self.get_settings(step)
            )
            arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
                step_name=step_name
            )
            task = dag_generator_values.task_configuration_class(
                id=step_name,
                zenml_step_name=step.config.name,
                upstream_steps=step.spec.upstream_steps,
                command=command,
                arguments=arguments,
                operator_source=settings.operator,
                operator_args=settings.operator_args,
            )
            tasks.append(task)

        local_stores_path = (
            os.path.expanduser(GlobalConfiguration().local_stores_path)
            if self.config.local
            else None
        )
        docker_image = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
        dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
            pipeline_name=deployment.pipeline.name
        )
        dag_config = dag_generator_values.dag_configuration_class(
            id=dag_id,
            docker_image=docker_image,
            local_stores_path=local_stores_path,
            tasks=tasks,
            tags=pipeline_settings.dag_tags,
            dag_args=pipeline_settings.dag_args,
            **self._translate_schedule(deployment.schedule),
        )

        self._write_dag(
            dag_config,
            dag_generator_values=dag_generator_values,
            output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
        )

    def _write_dag(
        self,
        dag_config: "DagConfiguration",
        dag_generator_values: DagGeneratorValues,
        output_dir: str,
    ) -> None:
        """Writes an Airflow DAG to disk.

        Args:
            dag_config: Configuration of the DAG to write.
            dag_generator_values: Values of the DAG generator to use.
            output_dir: The directory in which to write the DAG.
        """
        io_utils.create_dir_recursive_if_not_exists(output_dir)

        if self.config.local and output_dir != self.dags_directory:
            logger.warning(
                "You're using a local Airflow orchestrator but specified a "
                "custom DAG output directory `%s`. This DAG will not be found "
                "by the local Airflow server until you copy it in the DAGs "
                "directory `%s`.",
                output_dir,
                self.dags_directory,
            )

        def _write_zip(path: str) -> None:
            with zipfile.ZipFile(path, mode="w") as z:
                z.write(dag_generator_values.file, arcname="dag.py")
                z.writestr(
                    dag_generator_values.config_file_name, dag_config.json()
                )

            logger.info("Writing DAG definition to `%s`.", path)

        dag_filename = f"{dag_config.id}.zip"
        if io_utils.is_remote(output_dir):
            io_utils.create_dir_recursive_if_not_exists(self.dags_directory)
            local_zip_path = os.path.join(self.dags_directory, dag_filename)
            remote_zip_path = os.path.join(output_dir, dag_filename)
            _write_zip(local_zip_path)
            try:
                fileio.copy(local_zip_path, remote_zip_path)
                logger.info("Copied DAG definition to `%s`.", remote_zip_path)
            except Exception as e:
                logger.exception(e)
                logger.error(
                    "Failed to upload DAG to remote path `%s`. To run the "
                    "pipeline in Airflow, please manually copy the file `%s` "
                    "to your Airflow DAG directory.",
                    remote_zip_path,
                    local_zip_path,
                )
        else:
            zip_path = os.path.join(output_dir, dag_filename)
            _write_zip(zip_path)

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        from zenml.integrations.airflow.orchestrators.dag_generator import (
            ENV_ZENML_AIRFLOW_RUN_ID,
        )

        try:
            return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_AIRFLOW_RUN_ID}."
            )

    @staticmethod
    def _translate_schedule(
        schedule: Optional["Schedule"] = None,
    ) -> Dict[str, Any]:
        """Convert ZenML schedule into Airflow schedule.

        The Airflow schedule uses slightly different naming and needs some
        default entries for execution without a schedule.

        Args:
            schedule: Containing the interval, start and end date and
                a boolean flag that defines if past runs should be caught up
                on

        Returns:
            Airflow configuration dict.
        """
        if schedule:
            if schedule.cron_expression:
                start_time = schedule.start_time or (
                    datetime.datetime.now() - datetime.timedelta(7)
                )
                return {
                    "schedule": schedule.cron_expression,
                    "start_date": start_time,
                    "end_date": schedule.end_time,
                    "catchup": schedule.catchup,
                }
            else:
                return {
                    "schedule": schedule.interval_second,
                    "start_date": schedule.start_time,
                    "end_date": schedule.end_time,
                    "catchup": schedule.catchup,
                }

        return {
            "schedule": "@once",
            # set the a start time in the past and disable catchup so airflow
            # runs the dag immediately
            "start_date": datetime.datetime.now() - datetime.timedelta(7),
            "catchup": False,
        }

    #####################
    #   Local Airflow   #
    #####################

    @property
    def pid_file(self) -> str:
        """Returns path to the daemon PID file.

        Returns:
            Path to the daemon PID file.
        """
        return os.path.join(self.airflow_home, "airflow_daemon.pid")

    @property
    def log_file(self) -> str:
        """Returns path to the airflow log file.

        Returns:
            str: Path to the airflow log file.
        """
        return os.path.join(self.airflow_home, "airflow_orchestrator.log")

    @property
    def password_file(self) -> str:
        """Returns path to the webserver password file.

        Returns:
            Path to the webserver password file.
        """
        return os.path.join(self.airflow_home, "standalone_admin_password.txt")

    @property
    def is_running(self) -> bool:
        """Returns whether the orchestrator is "running".

        In the non-local case, this is always True. Otherwise checks if the
        local Airflow server is running.

        Returns:
            If the orchestrator is running.

        Raises:
            RuntimeError: If port 8080 is occupied.
        """
        if not self.config.local:
            return True

        from airflow.cli.commands.standalone_command import (
            StandaloneCommand,  # type: ignore
        )
        from airflow.jobs.triggerer_job import TriggererJob

        daemon_running = daemon.check_if_daemon_is_running(self.pid_file)

        command = StandaloneCommand()
        webserver_port_open = command.port_open(8080)

        if not daemon_running:
            if webserver_port_open:
                raise RuntimeError(
                    "The airflow daemon does not seem to be running but "
                    "local port 8080 is occupied. Make sure the port is "
                    "available and try again."
                )

            # exit early so we don't check non-existing airflow databases
            return False

        # we can't use StandaloneCommand().is_ready() here as the
        # Airflow SequentialExecutor apparently does not send a heartbeat
        # while running a task which would result in this returning `False`
        # even if Airflow is running.
        airflow_running = webserver_port_open and command.job_running(
            TriggererJob
        )
        return airflow_running

    @property
    def is_provisioned(self) -> bool:
        """Returns whether the airflow daemon is currently running.

        Returns:
            True if the airflow daemon is running, False otherwise.
        """
        return self.is_running

    def provision(self) -> None:
        """Ensures that Airflow is running."""
        if not self.config.local:
            return

        if self.is_running:
            logger.info("Airflow is already running.")
            self._log_webserver_credentials()
            return

        self._check_local_server_requirements()

        if not fileio.exists(self.dags_directory):
            io_utils.create_dir_recursive_if_not_exists(self.dags_directory)

        from airflow.cli.commands.standalone_command import StandaloneCommand

        try:
            command = StandaloneCommand()
            daemon.run_as_daemon(
                command.run,
                pid_file=self.pid_file,
                log_file=self.log_file,
            )
            while not self.is_running:
                # Wait until the daemon started all the relevant airflow
                # processes
                time.sleep(0.1)
            self._log_webserver_credentials()
        except Exception as e:
            logger.error(e)
            logger.error(
                "An error occurred while starting the Airflow daemon. If you "
                "want to start it manually, use the commands described in the "
                "official Airflow quickstart guide for running Airflow locally."
            )
            self.deprovision()

    def deprovision(self) -> None:
        """Stops the airflow daemon if necessary and tears down resources."""
        if not self.config.local:
            return

        if self.is_running:
            daemon.stop_daemon(self.pid_file)

        fileio.rmtree(self.airflow_home)
        logger.info("Airflow spun down.")

    @staticmethod
    def _check_local_server_requirements() -> None:
        """Checks that all packages for a local Airflow server are installed.

        When running a local Airflow server, we require the
        `apache-airflow-providers-docker` to run steps locally in Docker
        containers in addition to the basic integration requirements.

        Raises:
            RuntimeError: If the `apache-airflow-providers-docker` is not
                installed in the active Python environment.
        """
        try:
            from airflow.providers.docker.operators.docker import (  # noqa
                DockerOperator,
            )
        except ImportError:
            raise RuntimeError(
                "Unable to import Airflow `DockerOperator` in the active "
                "Python environment. Spinning up a local Airflow server to "
                "run ZenML pipelines requires the `DockerOperator` to be "
                "available. Please run "
                "`pip install apache-airflow-providers-docker` to install it "
                "and try again."
            )

    def _log_webserver_credentials(self) -> None:
        """Logs URL and credentials to log in to the airflow webserver.

        Raises:
            FileNotFoundError: If the password file does not exist.
        """
        if fileio.exists(self.password_file):
            with open(self.password_file) as file:
                password = file.read().strip()
        else:
            raise FileNotFoundError(
                f"Can't find password file '{self.password_file}'"
            )
        logger.info(
            "To inspect your DAGs, login to `http://localhost:8080` "
            "with username: `admin` password: `%s`",
            password,
        )
config: AirflowOrchestratorConfig property readonly

Returns the orchestrator config.

Returns:

Type Description
AirflowOrchestratorConfig

The configuration.

dags_directory: str property readonly

Returns path to the airflow dags directory.

Returns:

Type Description
str

Path to the airflow dags directory.

is_provisioned: bool property readonly

Returns whether the airflow daemon is currently running.

Returns:

Type Description
bool

True if the airflow daemon is running, False otherwise.

is_running: bool property readonly

Returns whether the orchestrator is "running".

In the non-local case, this is always True. Otherwise checks if the local Airflow server is running.

Returns:

Type Description
bool

If the orchestrator is running.

Exceptions:

Type Description
RuntimeError

If port 8080 is occupied.

log_file: str property readonly

Returns path to the airflow log file.

Returns:

Type Description
str

Path to the airflow log file.

password_file: str property readonly

Returns path to the webserver password file.

Returns:

Type Description
str

Path to the webserver password file.

pid_file: str property readonly

Returns path to the daemon PID file.

Returns:

Type Description
str

Path to the daemon PID file.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Kubeflow orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[StackValidator] property readonly

Validates the stack.

In the remote case, checks that the stack contains a container registry and only remote components.

Returns:

Type Description
Optional[StackValidator]

A StackValidator instance.

__init__(self, **values) special

Sets environment variables to configure airflow.

Parameters:

Name Type Description Default
**values Any

Values to set in the orchestrator.

{}
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __init__(self, **values: Any):
    """Sets environment variables to configure airflow.

    Args:
        **values: Values to set in the orchestrator.
    """
    super().__init__(**values)
    self.airflow_home = os.path.join(
        io_utils.get_global_config_directory(),
        "airflow",
        str(self.id),
    )
    self._set_env()
deprovision(self)

Stops the airflow daemon if necessary and tears down resources.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def deprovision(self) -> None:
    """Stops the airflow daemon if necessary and tears down resources."""
    if not self.config.local:
        return

    if self.is_running:
        daemon.stop_daemon(self.pid_file)

    fileio.rmtree(self.airflow_home)
    logger.info("Airflow spun down.")
get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    from zenml.integrations.airflow.orchestrators.dag_generator import (
        ENV_ZENML_AIRFLOW_RUN_ID,
    )

    try:
        return os.environ[ENV_ZENML_AIRFLOW_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_AIRFLOW_RUN_ID}."
        )
prepare_or_run_pipeline(self, deployment, stack)

Creates and writes an Airflow DAG zip file.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> Any:
    """Creates and writes an Airflow DAG zip file.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.
    """
    pipeline_settings = cast(
        AirflowOrchestratorSettings, self.get_settings(deployment)
    )

    dag_generator_values = get_dag_generator_values(
        custom_dag_generator_source=pipeline_settings.custom_dag_generator
    )

    command = StepEntrypointConfiguration.get_entrypoint_command()

    tasks = []
    for step_name, step in deployment.steps.items():
        settings = cast(
            AirflowOrchestratorSettings, self.get_settings(step)
        )
        arguments = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=step_name
        )
        task = dag_generator_values.task_configuration_class(
            id=step_name,
            zenml_step_name=step.config.name,
            upstream_steps=step.spec.upstream_steps,
            command=command,
            arguments=arguments,
            operator_source=settings.operator,
            operator_args=settings.operator_args,
        )
        tasks.append(task)

    local_stores_path = (
        os.path.expanduser(GlobalConfiguration().local_stores_path)
        if self.config.local
        else None
    )
    docker_image = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
    dag_id = pipeline_settings.dag_id or get_orchestrator_run_name(
        pipeline_name=deployment.pipeline.name
    )
    dag_config = dag_generator_values.dag_configuration_class(
        id=dag_id,
        docker_image=docker_image,
        local_stores_path=local_stores_path,
        tasks=tasks,
        tags=pipeline_settings.dag_tags,
        dag_args=pipeline_settings.dag_args,
        **self._translate_schedule(deployment.schedule),
    )

    self._write_dag(
        dag_config,
        dag_generator_values=dag_generator_values,
        output_dir=pipeline_settings.dag_output_dir or self.dags_directory,
    )
prepare_pipeline_deployment(self, deployment, stack)

Builds a Docker image to run pipeline steps.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Builds a Docker image to run pipeline steps.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    if self.config.local:
        stack.check_local_paths()

    docker_image_builder = PipelineDockerImageBuilder()
    if stack.container_registry:
        repo_digest = docker_image_builder.build_and_push_docker_image(
            deployment=deployment, stack=stack
        )
        deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
    else:
        # If there is no container registry, we only build the image
        target_image_name = docker_image_builder.get_target_image_name(
            deployment=deployment
        )
        docker_image_builder.build_docker_image(
            target_image_name=target_image_name,
            deployment=deployment,
            stack=stack,
        )
        deployment.add_extra(
            ORCHESTRATOR_DOCKER_IMAGE_KEY, target_image_name
        )
provision(self)

Ensures that Airflow is running.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def provision(self) -> None:
    """Ensures that Airflow is running."""
    if not self.config.local:
        return

    if self.is_running:
        logger.info("Airflow is already running.")
        self._log_webserver_credentials()
        return

    self._check_local_server_requirements()

    if not fileio.exists(self.dags_directory):
        io_utils.create_dir_recursive_if_not_exists(self.dags_directory)

    from airflow.cli.commands.standalone_command import StandaloneCommand

    try:
        command = StandaloneCommand()
        daemon.run_as_daemon(
            command.run,
            pid_file=self.pid_file,
            log_file=self.log_file,
        )
        while not self.is_running:
            # Wait until the daemon started all the relevant airflow
            # processes
            time.sleep(0.1)
        self._log_webserver_credentials()
    except Exception as e:
        logger.error(e)
        logger.error(
            "An error occurred while starting the Airflow daemon. If you "
            "want to start it manually, use the commands described in the "
            "official Airflow quickstart guide for running Airflow locally."
        )
        self.deprovision()
DagGeneratorValues (tuple)

Values from the DAG generator module.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
class DagGeneratorValues(NamedTuple):
    """Values from the DAG generator module."""

    file: str
    config_file_name: str
    run_id_env_variable_name: str
    dag_configuration_class: Type["DagConfiguration"]
    task_configuration_class: Type["TaskConfiguration"]
__getnewargs__(self) special

Return self as a plain tuple. Used by copy and pickle.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __getnewargs__(self):
    'Return self as a plain tuple.  Used by copy and pickle.'
    return _tuple(self)
__new__(_cls, file, config_file_name, run_id_env_variable_name, dag_configuration_class, task_configuration_class) special staticmethod

Create new instance of DagGeneratorValues(file, config_file_name, run_id_env_variable_name, dag_configuration_class, task_configuration_class)

__repr__(self) special

Return a nicely formatted representation string

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def __repr__(self):
    'Return a nicely formatted representation string'
    return self.__class__.__name__ + repr_fmt % self
get_dag_generator_values(custom_dag_generator_source=None)

Gets values from the DAG generator module.

Parameters:

Name Type Description Default
custom_dag_generator_source Optional[str]

Source of a custom DAG generator module.

None

Returns:

Type Description
DagGeneratorValues

DAG generator module values.

Source code in zenml/integrations/airflow/orchestrators/airflow_orchestrator.py
def get_dag_generator_values(
    custom_dag_generator_source: Optional[str] = None,
) -> DagGeneratorValues:
    """Gets values from the DAG generator module.

    Args:
        custom_dag_generator_source: Source of a custom DAG generator module.

    Returns:
        DAG generator module values.
    """
    if custom_dag_generator_source:
        module = importlib.import_module(custom_dag_generator_source)
    else:
        from zenml.integrations.airflow.orchestrators import dag_generator

        module = dag_generator

    assert module.__file__
    return DagGeneratorValues(
        file=module.__file__,
        config_file_name=module.CONFIG_FILENAME,
        run_id_env_variable_name=module.ENV_ZENML_AIRFLOW_RUN_ID,
        dag_configuration_class=module.DagConfiguration,
        task_configuration_class=module.TaskConfiguration,
    )

dag_generator

Module to generate an Airflow DAG from a config file.

DagConfiguration (BaseModel) pydantic-model

Airflow DAG configuration.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
class DagConfiguration(BaseModel):
    """Airflow DAG configuration."""

    id: str
    docker_image: str
    tasks: List[TaskConfiguration]

    local_stores_path: Optional[str] = None

    schedule: Union[datetime.timedelta, str]
    start_date: datetime.datetime
    end_date: Optional[datetime.datetime] = None
    catchup: bool = False

    tags: List[str] = []
    dag_args: Dict[str, Any] = {}
TaskConfiguration (BaseModel) pydantic-model

Airflow task configuration.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
class TaskConfiguration(BaseModel):
    """Airflow task configuration."""

    id: str
    zenml_step_name: str
    upstream_steps: List[str]

    command: List[str]
    arguments: List[str]

    operator_source: str
    operator_args: Dict[str, Any] = {}
get_docker_operator_init_kwargs(dag_config, task_config)

Gets keyword arguments to pass to the DockerOperator.

Parameters:

Name Type Description Default
dag_config DagConfiguration

The configuration of the DAG.

required
task_config TaskConfiguration

The configuration of the task.

required

Returns:

Type Description
Dict[str, Any]

The init keyword arguments.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_docker_operator_init_kwargs(
    dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
    """Gets keyword arguments to pass to the DockerOperator.

    Args:
        dag_config: The configuration of the DAG.
        task_config: The configuration of the task.

    Returns:
        The init keyword arguments.
    """
    mounts = []
    extra_hosts = {}
    environment = {ENV_ZENML_AIRFLOW_RUN_ID: "{{run_id}}"}

    if dag_config.local_stores_path:
        from docker.types import Mount

        environment[ENV_ZENML_LOCAL_STORES_PATH] = dag_config.local_stores_path
        mounts = [
            Mount(
                target=dag_config.local_stores_path,
                source=dag_config.local_stores_path,
                type="bind",
            )
        ]
        extra_hosts = {"host.docker.internal": "host-gateway"}
    return {
        "image": dag_config.docker_image,
        "command": task_config.command + task_config.arguments,
        "mounts": mounts,
        "environment": environment,
        "extra_hosts": extra_hosts,
    }
get_kubernetes_pod_operator_init_kwargs(dag_config, task_config)

Gets keyword arguments to pass to the KubernetesPodOperator.

Parameters:

Name Type Description Default
dag_config DagConfiguration

The configuration of the DAG.

required
task_config TaskConfiguration

The configuration of the task.

required

Returns:

Type Description
Dict[str, Any]

The init keyword arguments.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_kubernetes_pod_operator_init_kwargs(
    dag_config: DagConfiguration, task_config: TaskConfiguration
) -> Dict[str, Any]:
    """Gets keyword arguments to pass to the KubernetesPodOperator.

    Args:
        dag_config: The configuration of the DAG.
        task_config: The configuration of the task.

    Returns:
        The init keyword arguments.
    """
    from kubernetes.client.models import V1EnvVar

    return {
        "name": f"{dag_config.id}_{task_config.id}",
        "namespace": "default",
        "image": dag_config.docker_image,
        "cmds": task_config.command,
        "arguments": task_config.arguments,
        "env_vars": [
            V1EnvVar(name=ENV_ZENML_AIRFLOW_RUN_ID, value="{{run_id}}")
        ],
    }
get_operator_init_kwargs(operator_class, dag_config, task_config)

Gets keyword arguments to pass to the operator init method.

Parameters:

Name Type Description Default
operator_class Type[Any]

The operator class for which to get the kwargs.

required
dag_config DagConfiguration

The configuration of the DAG.

required
task_config TaskConfiguration

The configuration of the task.

required

Returns:

Type Description
Dict[str, Any]

The init keyword arguments.

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def get_operator_init_kwargs(
    operator_class: Type[Any],
    dag_config: DagConfiguration,
    task_config: TaskConfiguration,
) -> Dict[str, Any]:
    """Gets keyword arguments to pass to the operator init method.

    Args:
        operator_class: The operator class for which to get the kwargs.
        dag_config: The configuration of the DAG.
        task_config: The configuration of the task.

    Returns:
        The init keyword arguments.
    """
    init_kwargs = {"task_id": task_config.id}

    try:
        from airflow.providers.docker.operators.docker import DockerOperator

        if issubclass(operator_class, DockerOperator):
            init_kwargs.update(
                get_docker_operator_init_kwargs(
                    dag_config=dag_config, task_config=task_config
                )
            )
    except ImportError:
        pass

    try:
        from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import (
            KubernetesPodOperator,
        )

        if issubclass(operator_class, KubernetesPodOperator):
            init_kwargs.update(
                get_kubernetes_pod_operator_init_kwargs(
                    dag_config=dag_config, task_config=task_config
                )
            )
    except ImportError:
        pass

    init_kwargs.update(task_config.operator_args)
    return init_kwargs
import_class_by_path(class_path)

Imports a class based on a given path.

Parameters:

Name Type Description Default
class_path str

str, class_source e.g. this.module.Class

required

Returns:

Type Description
Type[Any]

the given class

Source code in zenml/integrations/airflow/orchestrators/dag_generator.py
def import_class_by_path(class_path: str) -> Type[Any]:
    """Imports a class based on a given path.

    Args:
        class_path: str, class_source e.g. this.module.Class

    Returns:
        the given class
    """
    module_name, class_name = class_path.rsplit(".", 1)
    module = importlib.import_module(module_name)
    return getattr(module, class_name)  # type: ignore[no-any-return]