Skip to content

Kubernetes

zenml.integrations.kubernetes special

Kubernetes integration for Kubernetes-native orchestration.

The Kubernetes integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Kubernetes orchestrator with the CLI tool.

KubernetesIntegration (Integration)

Definition of Kubernetes integration for ZenML.

Source code in zenml/integrations/kubernetes/__init__.py
class KubernetesIntegration(Integration):
    """Definition of Kubernetes integration for ZenML."""

    NAME = KUBERNETES
    REQUIREMENTS = ["kubernetes==18.20.0"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Kubernetes integration.

        Returns:
            List of new stack component flavors.
        """
        from zenml.integrations.kubernetes.flavors import (
            KubernetesOrchestratorFlavor,
        )

        return [KubernetesOrchestratorFlavor]

flavors() classmethod

Declare the stack component flavors for the Kubernetes integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of new stack component flavors.

Source code in zenml/integrations/kubernetes/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Kubernetes integration.

    Returns:
        List of new stack component flavors.
    """
    from zenml.integrations.kubernetes.flavors import (
        KubernetesOrchestratorFlavor,
    )

    return [KubernetesOrchestratorFlavor]

flavors special

Kubernetes integration flavors.

kubernetes_orchestrator_flavor

Kubernetes orchestrator flavor.

KubernetesOrchestratorConfig (BaseOrchestratorConfig, KubernetesOrchestratorSettings) pydantic-model

Configuration for the Kubernetes orchestrator.

Attributes:

Name Type Description
kubernetes_context str

Name of a Kubernetes context to run pipelines in.

kubernetes_namespace str

Name of the Kubernetes namespace to be used. If not provided, zenml namespace will be used.

local bool

If True, the orchestrator will assume it is connected to a local kubernetes cluster and will perform additional validations and operations to allow using the orchestrator in combination with other local stack components that store data in the local filesystem (i.e. it will mount the local stores directory into the pipeline containers).

skip_local_validations bool

If True, the local validations will be skipped.

Source code in zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py
class KubernetesOrchestratorConfig(  # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
    BaseOrchestratorConfig, KubernetesOrchestratorSettings
):
    """Configuration for the Kubernetes orchestrator.

    Attributes:
        kubernetes_context: Name of a Kubernetes context to run pipelines in.
        kubernetes_namespace: Name of the Kubernetes namespace to be used.
            If not provided, `zenml` namespace will be used.
        local: If `True`, the orchestrator will assume it is connected to a
            local kubernetes cluster and will perform additional validations and
            operations to allow using the orchestrator in combination with other
            local stack components that store data in the local filesystem
            (i.e. it will mount the local stores directory into the pipeline
            containers).
        skip_local_validations: If `True`, the local validations will be
            skipped.
    """

    kubernetes_context: str  # TODO: Potential setting
    kubernetes_namespace: str = "zenml"
    local: bool = False
    skip_local_validations: bool = False

    @root_validator(pre=True)
    def _validate_deprecated_attrs(
        cls, values: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Pydantic root_validator for deprecated attributes.

        This root validator is used for backwards compatibility purposes. E.g.
        it handles attributes that are no longer available or that have become
        mandatory in the meantime.

        Args:
            values: Values passed to the object constructor

        Returns:
            Values passed to the object constructor

        Raises:
            ValueError: If the attributes or their values are not valid.
        """
        if not values.get("kubernetes_context"):
            raise ValueError(
                "An empty value is no longer allowed for the "
                "`kubernetes_context` attribute of the Kubernetes "
                "orchestrator, to avoid unpredictable behavior. Please set "
                "the `kubernetes_context` attribute to the name of the "
                "Kubernetes config context pointing to the cluster where "
                "you would like to run pipelines."
            )

        return values

    @property
    def is_remote(self) -> bool:
        """Checks if this stack component is running remotely.

        This designation is used to determine if the stack component can be
        used with a local ZenML database or if it requires a remote ZenML
        server.

        Returns:
            True if this config is for a remote component, False otherwise.
        """
        return not self.local

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return self.local
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

is_remote: bool property readonly

Checks if this stack component is running remotely.

This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.

Returns:

Type Description
bool

True if this config is for a remote component, False otherwise.

KubernetesOrchestratorFlavor (BaseOrchestratorFlavor)

Kubernetes orchestrator flavor.

Source code in zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py
class KubernetesOrchestratorFlavor(BaseOrchestratorFlavor):
    """Kubernetes orchestrator flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return KUBERNETES_ORCHESTRATOR_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/orchestrator/kubernetes.png"

    @property
    def config_class(self) -> Type[KubernetesOrchestratorConfig]:
        """Returns `KubernetesOrchestratorConfig` config class.

        Returns:
                The config class.
        """
        return KubernetesOrchestratorConfig

    @property
    def implementation_class(self) -> Type["KubernetesOrchestrator"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.kubernetes.orchestrators import (
            KubernetesOrchestrator,
        )

        return KubernetesOrchestrator
config_class: Type[zenml.integrations.kubernetes.flavors.kubernetes_orchestrator_flavor.KubernetesOrchestratorConfig] property readonly

Returns KubernetesOrchestratorConfig config class.

Returns:

Type Description
Type[zenml.integrations.kubernetes.flavors.kubernetes_orchestrator_flavor.KubernetesOrchestratorConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[KubernetesOrchestrator] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[KubernetesOrchestrator]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

KubernetesOrchestratorSettings (BaseSettings) pydantic-model

Settings for the Kubernetes orchestrator.

Attributes:

Name Type Description
synchronous bool

If True, running a pipeline using this orchestrator will block until all steps finished running on Kubernetes.

timeout int

How many seconds to wait for synchronous runs. 0 means to wait for an unlimited duration.

pod_settings Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings]

Pod settings to apply.

Source code in zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py
class KubernetesOrchestratorSettings(BaseSettings):
    """Settings for the Kubernetes orchestrator.

    Attributes:
        synchronous: If `True`, running a pipeline using this orchestrator will
            block until all steps finished running on Kubernetes.
        timeout: How many seconds to wait for synchronous runs. `0` means
            to wait for an unlimited duration.
        pod_settings: Pod settings to apply.
    """

    synchronous: bool = False
    timeout: int = 0

    pod_settings: Optional[KubernetesPodSettings] = None

orchestrators special

Kubernetes-native orchestration.

kube_utils

Utilities for Kubernetes related functions.

Internal interface: no backwards compatibility guarantees. Adjusted from https://github.com/tensorflow/tfx/blob/master/tfx/utils/kube_utils.py.

PodPhase (Enum)

Phase of the Kubernetes pod.

Pod phases are defined in https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.

Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
class PodPhase(enum.Enum):
    """Phase of the Kubernetes pod.

    Pod phases are defined in
    https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
    """

    PENDING = "Pending"
    RUNNING = "Running"
    SUCCEEDED = "Succeeded"
    FAILED = "Failed"
    UNKNOWN = "Unknown"
create_edit_service_account(core_api, rbac_api, service_account_name, namespace, cluster_role_binding_name='zenml-edit')

Create a new Kubernetes service account with "edit" rights.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
rbac_api RbacAuthorizationV1Api

Client of Rbac Authorization V1 API of Kubernetes API.

required
service_account_name str

Name of the service account.

required
namespace str

Kubernetes namespace. Defaults to "default".

required
cluster_role_binding_name str

Name of the cluster role binding. Defaults to "zenml-edit".

'zenml-edit'
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def create_edit_service_account(
    core_api: k8s_client.CoreV1Api,
    rbac_api: k8s_client.RbacAuthorizationV1Api,
    service_account_name: str,
    namespace: str,
    cluster_role_binding_name: str = "zenml-edit",
) -> None:
    """Create a new Kubernetes service account with "edit" rights.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        rbac_api: Client of Rbac Authorization V1 API of Kubernetes API.
        service_account_name: Name of the service account.
        namespace: Kubernetes namespace. Defaults to "default".
        cluster_role_binding_name: Name of the cluster role binding.
            Defaults to "zenml-edit".
    """
    crb_manifest = build_cluster_role_binding_manifest_for_service_account(
        name=cluster_role_binding_name,
        role_name="edit",
        service_account_name=service_account_name,
        namespace=namespace,
    )
    _if_not_exists(rbac_api.create_cluster_role_binding)(body=crb_manifest)

    sa_manifest = build_service_account_manifest(
        name=service_account_name, namespace=namespace
    )
    _if_not_exists(core_api.create_namespaced_service_account)(
        namespace=namespace,
        body=sa_manifest,
    )
create_namespace(core_api, namespace)

Create a Kubernetes namespace.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of Core V1 API of Kubernetes API.

required
namespace str

Kubernetes namespace. Defaults to "default".

required
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def create_namespace(core_api: k8s_client.CoreV1Api, namespace: str) -> None:
    """Create a Kubernetes namespace.

    Args:
        core_api: Client of Core V1 API of Kubernetes API.
        namespace: Kubernetes namespace. Defaults to "default".
    """
    manifest = build_namespace_manifest(namespace)
    _if_not_exists(core_api.create_namespace)(body=manifest)
get_pod(core_api, pod_name, namespace)

Get a pod from Kubernetes metadata API.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of CoreV1Api of Kubernetes API.

required
pod_name str

The name of the pod.

required
namespace str

The namespace of the pod.

required

Exceptions:

Type Description
RuntimeError

When it sees unexpected errors from Kubernetes API.

Returns:

Type Description
Optional[kubernetes.client.models.v1_pod.V1Pod]

The found pod object. None if it's not found.

Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def get_pod(
    core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str
) -> Optional[k8s_client.V1Pod]:
    """Get a pod from Kubernetes metadata API.

    Args:
        core_api: Client of `CoreV1Api` of Kubernetes API.
        pod_name: The name of the pod.
        namespace: The namespace of the pod.

    Raises:
        RuntimeError: When it sees unexpected errors from Kubernetes API.

    Returns:
        The found pod object. None if it's not found.
    """
    try:
        return core_api.read_namespaced_pod(name=pod_name, namespace=namespace)
    except k8s_client.rest.ApiException as e:
        if e.status == 404:
            return None
        raise RuntimeError from e
is_inside_kubernetes()

Check whether we are inside a Kubernetes cluster or on a remote host.

Returns:

Type Description
bool

True if inside a Kubernetes cluster, else False.

Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def is_inside_kubernetes() -> bool:
    """Check whether we are inside a Kubernetes cluster or on a remote host.

    Returns:
        True if inside a Kubernetes cluster, else False.
    """
    try:
        k8s_config.load_incluster_config()
        return True
    except k8s_config.ConfigException:
        return False
load_kube_config(context=None)

Load the Kubernetes client config.

Depending on the environment (whether it is inside the running Kubernetes cluster or remote host), different location will be searched for the config file.

Parameters:

Name Type Description Default
context Optional[str]

Name of the Kubernetes context. If not provided, uses the currently active context.

None
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def load_kube_config(context: Optional[str] = None) -> None:
    """Load the Kubernetes client config.

    Depending on the environment (whether it is inside the running Kubernetes
    cluster or remote host), different location will be searched for the config
    file.

    Args:
        context: Name of the Kubernetes context. If not provided, uses the
            currently active context.
    """
    try:
        k8s_config.load_incluster_config()
    except k8s_config.ConfigException:
        k8s_config.load_kube_config(context=context)
pod_failed(pod)

Check if pod status is 'Failed'.

Parameters:

Name Type Description Default
pod V1Pod

Kubernetes pod.

required

Returns:

Type Description
bool

True if pod status is 'Failed' else False.

Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def pod_failed(pod: k8s_client.V1Pod) -> bool:
    """Check if pod status is 'Failed'.

    Args:
        pod: Kubernetes pod.

    Returns:
        True if pod status is 'Failed' else False.
    """
    return pod.status.phase == PodPhase.FAILED.value  # type: ignore[no-any-return]
pod_is_done(pod)

Check if pod status is 'Succeeded'.

Parameters:

Name Type Description Default
pod V1Pod

Kubernetes pod.

required

Returns:

Type Description
bool

True if pod status is 'Succeeded' else False.

Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def pod_is_done(pod: k8s_client.V1Pod) -> bool:
    """Check if pod status is 'Succeeded'.

    Args:
        pod: Kubernetes pod.

    Returns:
        True if pod status is 'Succeeded' else False.
    """
    return pod.status.phase == PodPhase.SUCCEEDED.value  # type: ignore[no-any-return]
pod_is_not_pending(pod)

Check if pod status is not 'Pending'.

Parameters:

Name Type Description Default
pod V1Pod

Kubernetes pod.

required

Returns:

Type Description
bool

False if the pod status is 'Pending' else True.

Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def pod_is_not_pending(pod: k8s_client.V1Pod) -> bool:
    """Check if pod status is not 'Pending'.

    Args:
        pod: Kubernetes pod.

    Returns:
        False if the pod status is 'Pending' else True.
    """
    return pod.status.phase != PodPhase.PENDING.value  # type: ignore[no-any-return]
sanitize_pod_name(pod_name)

Sanitize pod names so they conform to Kubernetes pod naming convention.

Parameters:

Name Type Description Default
pod_name str

Arbitrary input pod name.

required

Returns:

Type Description
str

Sanitized pod name.

Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def sanitize_pod_name(pod_name: str) -> str:
    """Sanitize pod names so they conform to Kubernetes pod naming convention.

    Args:
        pod_name: Arbitrary input pod name.

    Returns:
        Sanitized pod name.
    """
    pod_name = re.sub(r"[^a-z0-9-]", "-", pod_name.lower())
    pod_name = re.sub(r"^[-]+", "", pod_name)
    return re.sub(r"[-]+", "-", pod_name)
wait_pod(core_api, pod_name, namespace, exit_condition_lambda, timeout_sec=0, exponential_backoff=False, stream_logs=False)

Wait for a pod to meet an exit condition.

Parameters:

Name Type Description Default
core_api CoreV1Api

Client of CoreV1Api of Kubernetes API.

required
pod_name str

The name of the pod.

required
namespace str

The namespace of the pod.

required
exit_condition_lambda Callable[[kubernetes.client.models.v1_pod.V1Pod], bool]

A lambda which will be called periodically to wait for a pod to exit. The function returns True to exit.

required
timeout_sec int

Timeout in seconds to wait for pod to reach exit condition, or 0 to wait for an unlimited duration. Defaults to unlimited.

0
exponential_backoff bool

Whether to use exponential back off for polling. Defaults to False.

False
stream_logs bool

Whether to stream the pod logs to zenml.logger.info(). Defaults to False.

False

Exceptions:

Type Description
RuntimeError

when the function times out.

Returns:

Type Description
V1Pod

The pod object which meets the exit condition.

Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def wait_pod(
    core_api: k8s_client.CoreV1Api,
    pod_name: str,
    namespace: str,
    exit_condition_lambda: Callable[[k8s_client.V1Pod], bool],
    timeout_sec: int = 0,
    exponential_backoff: bool = False,
    stream_logs: bool = False,
) -> k8s_client.V1Pod:
    """Wait for a pod to meet an exit condition.

    Args:
        core_api: Client of `CoreV1Api` of Kubernetes API.
        pod_name: The name of the pod.
        namespace: The namespace of the pod.
        exit_condition_lambda: A lambda
            which will be called periodically to wait for a pod to exit. The
            function returns True to exit.
        timeout_sec: Timeout in seconds to wait for pod to reach exit
            condition, or 0 to wait for an unlimited duration.
            Defaults to unlimited.
        exponential_backoff: Whether to use exponential back off for polling.
            Defaults to False.
        stream_logs: Whether to stream the pod logs to
            `zenml.logger.info()`. Defaults to False.

    Raises:
        RuntimeError: when the function times out.

    Returns:
        The pod object which meets the exit condition.
    """
    start_time = datetime.datetime.utcnow()

    # Link to exponential back-off algorithm used here:
    # https://cloud.google.com/storage/docs/exponential-backoff
    backoff_interval = 1
    maximum_backoff = 32

    logged_lines = 0

    while True:
        resp = get_pod(core_api, pod_name, namespace)

        # Stream logs to `zenml.logger.info()`.
        # TODO: can we do this without parsing all logs every time?
        if stream_logs and pod_is_not_pending(resp):
            response = core_api.read_namespaced_pod_log(
                name=pod_name,
                namespace=namespace,
            )
            logs = response.splitlines()
            if len(logs) > logged_lines:
                for line in logs[logged_lines:]:
                    logger.info(line)
                logged_lines = len(logs)

        # Raise an error if the pod failed.
        if pod_failed(resp):
            raise RuntimeError(f"Pod `{namespace}:{pod_name}` failed.")

        # Check if pod is in desired state (e.g. finished / running / ...).
        if exit_condition_lambda(resp):
            return resp

        # Check if wait timed out.
        elapse_time = datetime.datetime.utcnow() - start_time
        if elapse_time.seconds >= timeout_sec and timeout_sec != 0:
            raise RuntimeError(
                f"Waiting for pod `{namespace}:{pod_name}` timed out after "
                f"{timeout_sec} seconds."
            )

        # Wait (using exponential backoff).
        time.sleep(backoff_interval)
        if exponential_backoff and backoff_interval < maximum_backoff:
            backoff_interval *= 2

kubernetes_orchestrator

Kubernetes-native orchestrator.

KubernetesOrchestrator (ContainerizedOrchestrator)

Orchestrator for running ZenML pipelines using native Kubernetes.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
class KubernetesOrchestrator(ContainerizedOrchestrator):
    """Orchestrator for running ZenML pipelines using native Kubernetes."""

    _k8s_core_api: k8s_client.CoreV1Api = None
    _k8s_batch_api: k8s_client.BatchV1beta1Api = None
    _k8s_rbac_api: k8s_client.RbacAuthorizationV1Api = None

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Initialize the class and the Kubernetes clients.

        Args:
            *args: The positional arguments to pass to the Pydantic object.
            **kwargs: The keyword arguments to pass to the Pydantic object.
        """
        super().__init__(*args, **kwargs)
        self._initialize_k8s_clients()

    def _initialize_k8s_clients(self) -> None:
        """Initialize the Kubernetes clients."""
        kube_utils.load_kube_config(context=self.config.kubernetes_context)
        self._k8s_core_api = k8s_client.CoreV1Api()
        self._k8s_batch_api = k8s_client.BatchV1beta1Api()
        self._k8s_rbac_api = k8s_client.RbacAuthorizationV1Api()

    @property
    def config(self) -> KubernetesOrchestratorConfig:
        """Returns the `KubernetesOrchestratorConfig` config.

        Returns:
            The configuration.
        """
        return cast(KubernetesOrchestratorConfig, self._config)

    @property
    def settings_class(self) -> Optional[Type["BaseSettings"]]:
        """Settings class for the Kubernetes orchestrator.

        Returns:
            The settings class.
        """
        return KubernetesOrchestratorSettings

    def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
        """Get list of configured Kubernetes contexts and the active context.

        Raises:
            RuntimeError: if the Kubernetes configuration cannot be loaded.

        Returns:
            context_name: List of configured Kubernetes contexts
            active_context_name: Name of the active Kubernetes context.
        """
        try:
            contexts, active_context = k8s_config.list_kube_config_contexts()
        except k8s_config.config_exception.ConfigException as e:
            raise RuntimeError(
                "Could not load the Kubernetes configuration"
            ) from e

        context_names = [c["name"] for c in contexts]
        active_context_name = active_context["name"]
        return context_names, active_context_name

    @property
    def validator(self) -> Optional[StackValidator]:
        """Defines the validator that checks whether the stack is valid.

        Returns:
            Stack validator.
        """

        def _validate_local_requirements(stack: "Stack") -> Tuple[bool, str]:
            """Validates that the stack contains no local components.

            Args:
                stack: The stack.

            Returns:
                Whether the stack is valid or not.
                An explanation why the stack is invalid, if applicable.
            """
            container_registry = stack.container_registry

            # should not happen, because the stack validation takes care of
            # this, but just in case
            assert container_registry is not None

            contexts, active_context = self.get_kubernetes_contexts()

            if self.config.kubernetes_context not in contexts:
                return False, (
                    f"Could not find a Kubernetes context named "
                    f"'{self.config.kubernetes_context}' in the local Kubernetes "
                    f"configuration. Please make sure that the Kubernetes "
                    f"cluster is running and that the kubeconfig file is "
                    f"configured correctly. To list all configured "
                    f"contexts, run:\n\n"
                    f"  `kubectl config get-contexts`\n"
                )
            if self.config.kubernetes_context != active_context:
                logger.warning(
                    f"The Kubernetes context '{self.config.kubernetes_context}' "
                    f"configured for the Kubernetes orchestrator is not "
                    f"the same as the active context in the local "
                    f"Kubernetes configuration. If this is not deliberate,"
                    f" you should update the orchestrator's "
                    f"`kubernetes_context` field by running:\n\n"
                    f"  `zenml orchestrator update {self.name} "
                    f"--kubernetes_context={active_context}`\n"
                    f"To list all configured contexts, run:\n\n"
                    f"  `kubectl config get-contexts`\n"
                    f"To set the active context to be the same as the one "
                    f"configured in the Kubernetes orchestrator and "
                    f"silence this warning, run:\n\n"
                    f"  `kubectl config use-context "
                    f"{self.config.kubernetes_context}`\n"
                )

            silence_local_validations_msg = (
                f"To silence this warning, set the "
                f"`skip_local_validations` attribute to True in the "
                f"orchestrator configuration by running:\n\n"
                f"  'zenml orchestrator update {self.name} "
                f"--skip_local_validations=True'\n"
            )

            if (
                not self.config.skip_local_validations
                and not self.config.is_local
            ):

                # if the orchestrator is not running in a local k3d cluster,
                # we cannot have any other local components in our stack,
                # because we cannot mount the local path into the container.
                # This may result in problems when running the pipeline, because
                # the local components will not be available inside the
                # kubernetes containers.

                # go through all stack components and identify those that
                # advertise a local path where they persist information that
                # they need to be available when running pipelines.
                for stack_comp in stack.components.values():
                    if stack_comp.local_path is None:
                        continue
                    return False, (
                        f"The Kubernetes orchestrator is configured to run "
                        f"pipelines in a remote Kubernetes cluster designated "
                        f"by the '{self.config.kubernetes_context}' configuration "
                        f"context, but the '{stack_comp.name}' "
                        f"{stack_comp.type.value} is a local stack component "
                        f"and will not be available in the Kubernetes pipeline "
                        f"step.\nPlease ensure that you always use non-local "
                        f"stack components with a remote Kubernetes orchestrator, "
                        f"otherwise you may run into pipeline execution "
                        f"problems. You should use a flavor of "
                        f"{stack_comp.type.value} other than "
                        f"'{stack_comp.flavor}'.\n"
                        + silence_local_validations_msg
                    )

                # if the orchestrator is remote, the container registry must
                # also be remote.
                if container_registry.config.is_local:
                    return False, (
                        f"The Kubernetes orchestrator is configured to run "
                        f"pipelines in a remote Kubernetes cluster designated "
                        f"by the '{self.config.kubernetes_context}' configuration "
                        f"context, but the '{container_registry.name}' "
                        f"container registry URI "
                        f"'{container_registry.config.uri}' "
                        f"points to a local container registry. Please ensure "
                        f"that you always use non-local stack components with "
                        f"a remote Kubernetes orchestrator, otherwise you will "
                        f"run into problems. You should use a flavor of "
                        f"container registry other than "
                        f"'{container_registry.flavor}'.\n"
                        + silence_local_validations_msg
                    )

            return True, ""

        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            },
            custom_validation_function=_validate_local_requirements,
        )

    def prepare_or_run_pipeline(
        self,
        deployment: "PipelineDeploymentResponseModel",
        stack: "Stack",
    ) -> Any:
        """Runs the pipeline in Kubernetes.

        Args:
            deployment: The pipeline deployment to prepare or run.
            stack: The stack the pipeline will run on.

        Raises:
            RuntimeError: If trying to run from a Jupyter notebook.
        """
        # First check whether the code is running in a notebook.
        if Environment.in_notebook():
            raise RuntimeError(
                "The Kubernetes orchestrator cannot run pipelines in a notebook "
                "environment. The reason is that it is non-trivial to create "
                "a Docker image of a notebook. Please consider refactoring "
                "your notebook cells into separate scripts in a Python module "
                "and run the code outside of a notebook when using this "
                "orchestrator."
            )

        for step in deployment.step_configurations.values():
            if self.requires_resources_in_orchestration_environment(step):
                logger.warning(
                    "Specifying step resources is not yet supported for "
                    "the Kubernetes orchestrator, ignoring resource "
                    "configuration for step %s.",
                    step.config.name,
                )

        pipeline_name = deployment.pipeline_configuration.name
        orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
        pod_name = kube_utils.sanitize_pod_name(orchestrator_run_name)

        assert stack.container_registry

        # Get Docker image for the orchestrator pod
        try:
            image = self.get_image(deployment=deployment)
        except KeyError:
            # If no generic pipeline image exists (which means all steps have
            # custom builds) we use a random step image as all of them include
            # dependencies for the active stack
            pipeline_step_name = next(iter(deployment.step_configurations))
            image = self.get_image(
                deployment=deployment, step_name=pipeline_step_name
            )

        # Build entrypoint command and args for the orchestrator pod.
        # This will internally also build the command/args for all step pods.
        command = (
            KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
        )
        args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
            run_name=orchestrator_run_name,
            deployment_id=deployment.id,
            kubernetes_namespace=self.config.kubernetes_namespace,
        )

        settings = cast(
            KubernetesOrchestratorSettings, self.get_settings(deployment)
        )

        # Authorize pod to run Kubernetes commands inside the cluster.
        service_account_name = "zenml-service-account"
        kube_utils.create_edit_service_account(
            core_api=self._k8s_core_api,
            rbac_api=self._k8s_rbac_api,
            service_account_name=service_account_name,
            namespace=self.config.kubernetes_namespace,
        )

        # Schedule as CRON job if CRON schedule is given.
        if deployment.schedule:
            if not deployment.schedule.cron_expression:
                raise RuntimeError(
                    "The Kubernetes orchestrator only supports scheduling via "
                    "CRON jobs, but the run was configured with a manual "
                    "schedule. Use `Schedule(cron_expression=...)` instead."
                )
            cron_expression = deployment.schedule.cron_expression
            cron_job_manifest = build_cron_job_manifest(
                cron_expression=cron_expression,
                run_name=orchestrator_run_name,
                pod_name=pod_name,
                pipeline_name=pipeline_name,
                image_name=image,
                command=command,
                args=args,
                service_account_name=service_account_name,
                settings=settings,
                mount_local_stores=self.config.is_local,
            )

            self._k8s_batch_api.create_namespaced_cron_job(
                body=cron_job_manifest,
                namespace=self.config.kubernetes_namespace,
            )
            logger.info(
                f"Scheduling Kubernetes run `{pod_name}` with CRON expression "
                f'`"{cron_expression}"`.'
            )
            return

        # Create and run the orchestrator pod.
        pod_manifest = build_pod_manifest(
            run_name=orchestrator_run_name,
            pod_name=pod_name,
            pipeline_name=pipeline_name,
            image_name=image,
            command=command,
            args=args,
            service_account_name=service_account_name,
            settings=settings,
            mount_local_stores=self.config.is_local,
        )

        self._k8s_core_api.create_namespaced_pod(
            namespace=self.config.kubernetes_namespace,
            body=pod_manifest,
        )

        # Wait for the orchestrator pod to finish and stream logs.
        if settings.synchronous:
            logger.info("Waiting for Kubernetes orchestrator pod...")
            kube_utils.wait_pod(
                core_api=self._k8s_core_api,
                pod_name=pod_name,
                namespace=self.config.kubernetes_namespace,
                exit_condition_lambda=kube_utils.pod_is_done,
                timeout_sec=settings.timeout,
                stream_logs=True,
            )
        else:
            logger.info(
                f"Orchestration started asynchronously in pod "
                f"`{self.config.kubernetes_namespace}:{pod_name}`. "
                f"Run the following command to inspect the logs: "
                f"`kubectl logs {pod_name} -n {self.config.kubernetes_namespace}`."
            )

    def get_orchestrator_run_id(self) -> str:
        """Returns the active orchestrator run id.

        Raises:
            RuntimeError: If the environment variable specifying the run id
                is not set.

        Returns:
            The orchestrator run id.
        """
        try:
            return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
        except KeyError:
            raise RuntimeError(
                "Unable to read run id from environment variable "
                f"{ENV_ZENML_KUBERNETES_RUN_ID}."
            )
config: KubernetesOrchestratorConfig property readonly

Returns the KubernetesOrchestratorConfig config.

Returns:

Type Description
KubernetesOrchestratorConfig

The configuration.

settings_class: Optional[Type[BaseSettings]] property readonly

Settings class for the Kubernetes orchestrator.

Returns:

Type Description
Optional[Type[BaseSettings]]

The settings class.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Defines the validator that checks whether the stack is valid.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

Stack validator.

__init__(self, *args, **kwargs) special

Initialize the class and the Kubernetes clients.

Parameters:

Name Type Description Default
*args Any

The positional arguments to pass to the Pydantic object.

()
**kwargs Any

The keyword arguments to pass to the Pydantic object.

{}
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """Initialize the class and the Kubernetes clients.

    Args:
        *args: The positional arguments to pass to the Pydantic object.
        **kwargs: The keyword arguments to pass to the Pydantic object.
    """
    super().__init__(*args, **kwargs)
    self._initialize_k8s_clients()
get_kubernetes_contexts(self)

Get list of configured Kubernetes contexts and the active context.

Exceptions:

Type Description
RuntimeError

if the Kubernetes configuration cannot be loaded.

Returns:

Type Description
context_name

List of configured Kubernetes contexts active_context_name: Name of the active Kubernetes context.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
    """Get list of configured Kubernetes contexts and the active context.

    Raises:
        RuntimeError: if the Kubernetes configuration cannot be loaded.

    Returns:
        context_name: List of configured Kubernetes contexts
        active_context_name: Name of the active Kubernetes context.
    """
    try:
        contexts, active_context = k8s_config.list_kube_config_contexts()
    except k8s_config.config_exception.ConfigException as e:
        raise RuntimeError(
            "Could not load the Kubernetes configuration"
        ) from e

    context_names = [c["name"] for c in contexts]
    active_context_name = active_context["name"]
    return context_names, active_context_name
get_orchestrator_run_id(self)

Returns the active orchestrator run id.

Exceptions:

Type Description
RuntimeError

If the environment variable specifying the run id is not set.

Returns:

Type Description
str

The orchestrator run id.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def get_orchestrator_run_id(self) -> str:
    """Returns the active orchestrator run id.

    Raises:
        RuntimeError: If the environment variable specifying the run id
            is not set.

    Returns:
        The orchestrator run id.
    """
    try:
        return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
    except KeyError:
        raise RuntimeError(
            "Unable to read run id from environment variable "
            f"{ENV_ZENML_KUBERNETES_RUN_ID}."
        )
prepare_or_run_pipeline(self, deployment, stack)

Runs the pipeline in Kubernetes.

Parameters:

Name Type Description Default
deployment PipelineDeploymentResponseModel

The pipeline deployment to prepare or run.

required
stack Stack

The stack the pipeline will run on.

required

Exceptions:

Type Description
RuntimeError

If trying to run from a Jupyter notebook.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def prepare_or_run_pipeline(
    self,
    deployment: "PipelineDeploymentResponseModel",
    stack: "Stack",
) -> Any:
    """Runs the pipeline in Kubernetes.

    Args:
        deployment: The pipeline deployment to prepare or run.
        stack: The stack the pipeline will run on.

    Raises:
        RuntimeError: If trying to run from a Jupyter notebook.
    """
    # First check whether the code is running in a notebook.
    if Environment.in_notebook():
        raise RuntimeError(
            "The Kubernetes orchestrator cannot run pipelines in a notebook "
            "environment. The reason is that it is non-trivial to create "
            "a Docker image of a notebook. Please consider refactoring "
            "your notebook cells into separate scripts in a Python module "
            "and run the code outside of a notebook when using this "
            "orchestrator."
        )

    for step in deployment.step_configurations.values():
        if self.requires_resources_in_orchestration_environment(step):
            logger.warning(
                "Specifying step resources is not yet supported for "
                "the Kubernetes orchestrator, ignoring resource "
                "configuration for step %s.",
                step.config.name,
            )

    pipeline_name = deployment.pipeline_configuration.name
    orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
    pod_name = kube_utils.sanitize_pod_name(orchestrator_run_name)

    assert stack.container_registry

    # Get Docker image for the orchestrator pod
    try:
        image = self.get_image(deployment=deployment)
    except KeyError:
        # If no generic pipeline image exists (which means all steps have
        # custom builds) we use a random step image as all of them include
        # dependencies for the active stack
        pipeline_step_name = next(iter(deployment.step_configurations))
        image = self.get_image(
            deployment=deployment, step_name=pipeline_step_name
        )

    # Build entrypoint command and args for the orchestrator pod.
    # This will internally also build the command/args for all step pods.
    command = (
        KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
    )
    args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
        run_name=orchestrator_run_name,
        deployment_id=deployment.id,
        kubernetes_namespace=self.config.kubernetes_namespace,
    )

    settings = cast(
        KubernetesOrchestratorSettings, self.get_settings(deployment)
    )

    # Authorize pod to run Kubernetes commands inside the cluster.
    service_account_name = "zenml-service-account"
    kube_utils.create_edit_service_account(
        core_api=self._k8s_core_api,
        rbac_api=self._k8s_rbac_api,
        service_account_name=service_account_name,
        namespace=self.config.kubernetes_namespace,
    )

    # Schedule as CRON job if CRON schedule is given.
    if deployment.schedule:
        if not deployment.schedule.cron_expression:
            raise RuntimeError(
                "The Kubernetes orchestrator only supports scheduling via "
                "CRON jobs, but the run was configured with a manual "
                "schedule. Use `Schedule(cron_expression=...)` instead."
            )
        cron_expression = deployment.schedule.cron_expression
        cron_job_manifest = build_cron_job_manifest(
            cron_expression=cron_expression,
            run_name=orchestrator_run_name,
            pod_name=pod_name,
            pipeline_name=pipeline_name,
            image_name=image,
            command=command,
            args=args,
            service_account_name=service_account_name,
            settings=settings,
            mount_local_stores=self.config.is_local,
        )

        self._k8s_batch_api.create_namespaced_cron_job(
            body=cron_job_manifest,
            namespace=self.config.kubernetes_namespace,
        )
        logger.info(
            f"Scheduling Kubernetes run `{pod_name}` with CRON expression "
            f'`"{cron_expression}"`.'
        )
        return

    # Create and run the orchestrator pod.
    pod_manifest = build_pod_manifest(
        run_name=orchestrator_run_name,
        pod_name=pod_name,
        pipeline_name=pipeline_name,
        image_name=image,
        command=command,
        args=args,
        service_account_name=service_account_name,
        settings=settings,
        mount_local_stores=self.config.is_local,
    )

    self._k8s_core_api.create_namespaced_pod(
        namespace=self.config.kubernetes_namespace,
        body=pod_manifest,
    )

    # Wait for the orchestrator pod to finish and stream logs.
    if settings.synchronous:
        logger.info("Waiting for Kubernetes orchestrator pod...")
        kube_utils.wait_pod(
            core_api=self._k8s_core_api,
            pod_name=pod_name,
            namespace=self.config.kubernetes_namespace,
            exit_condition_lambda=kube_utils.pod_is_done,
            timeout_sec=settings.timeout,
            stream_logs=True,
        )
    else:
        logger.info(
            f"Orchestration started asynchronously in pod "
            f"`{self.config.kubernetes_namespace}:{pod_name}`. "
            f"Run the following command to inspect the logs: "
            f"`kubectl logs {pod_name} -n {self.config.kubernetes_namespace}`."
        )

kubernetes_orchestrator_entrypoint

Entrypoint of the Kubernetes master/orchestrator pod.

main()

Entrypoint of the k8s master/orchestrator pod.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
def main() -> None:
    """Entrypoint of the k8s master/orchestrator pod."""
    # Log to the container's stdout so it can be streamed by the client.
    logger.info("Kubernetes orchestrator pod started.")

    # Parse / extract args.
    args = parse_args()

    # Get Kubernetes Core API for running kubectl commands later.
    kube_utils.load_kube_config()
    core_api = k8s_client.CoreV1Api()

    orchestrator_run_id = socket.gethostname()

    deployment_config = Client().get_deployment(args.deployment_id)

    pipeline_dag = {}
    step_name_to_pipeline_step_name = {}
    for (
        name_in_pipeline,
        step,
    ) in deployment_config.step_configurations.items():
        step_name_to_pipeline_step_name[step.config.name] = name_in_pipeline
        pipeline_dag[step.config.name] = step.spec.upstream_steps

    step_command = StepEntrypointConfiguration.get_entrypoint_command()

    active_stack = Client().active_stack
    mount_local_stores = active_stack.orchestrator.config.is_local

    def run_step_on_kubernetes(step_name: str) -> None:
        """Run a pipeline step in a separate Kubernetes pod.

        Args:
            step_name: Name of the step.
        """
        # Define Kubernetes pod name.
        pod_name = f"{orchestrator_run_id}-{step_name}"
        pod_name = kube_utils.sanitize_pod_name(pod_name)

        pipeline_step_name = step_name_to_pipeline_step_name[step_name]
        image = KubernetesOrchestrator.get_image(
            deployment=deployment_config, step_name=pipeline_step_name
        )
        step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
            step_name=pipeline_step_name, deployment_id=deployment_config.id
        )

        step_config = deployment_config.step_configurations[
            pipeline_step_name
        ].config
        settings = KubernetesOrchestratorSettings.parse_obj(
            step_config.settings.get("orchestrator.kubernetes", {})
        )

        # Define Kubernetes pod manifest.
        pod_manifest = build_pod_manifest(
            pod_name=pod_name,
            run_name=args.run_name,
            pipeline_name=deployment_config.pipeline_configuration.name,
            image_name=image,
            command=step_command,
            args=step_args,
            env={ENV_ZENML_KUBERNETES_RUN_ID: orchestrator_run_id},
            settings=settings,
            mount_local_stores=mount_local_stores,
        )

        # Create and run pod.
        core_api.create_namespaced_pod(
            namespace=args.kubernetes_namespace,
            body=pod_manifest,
        )

        # Wait for pod to finish.
        logger.info(f"Waiting for pod of step `{step_name}` to start...")
        kube_utils.wait_pod(
            core_api=core_api,
            pod_name=pod_name,
            namespace=args.kubernetes_namespace,
            exit_condition_lambda=kube_utils.pod_is_done,
            stream_logs=True,
        )
        logger.info(f"Pod of step `{step_name}` completed.")

    ThreadedDagRunner(dag=pipeline_dag, run_fn=run_step_on_kubernetes).run()

    logger.info("Orchestration pod completed.")
parse_args()

Parse entrypoint arguments.

Returns:

Type Description
Namespace

Parsed args.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
def parse_args() -> argparse.Namespace:
    """Parse entrypoint arguments.

    Returns:
        Parsed args.
    """
    parser = argparse.ArgumentParser()
    parser.add_argument("--run_name", type=str, required=True)
    parser.add_argument("--deployment_id", type=str, required=True)
    parser.add_argument("--kubernetes_namespace", type=str, required=True)
    return parser.parse_args()

kubernetes_orchestrator_entrypoint_configuration

Entrypoint configuration for the Kubernetes master/orchestrator pod.

KubernetesOrchestratorEntrypointConfiguration

Entrypoint configuration for the k8s master/orchestrator pod.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
class KubernetesOrchestratorEntrypointConfiguration:
    """Entrypoint configuration for the k8s master/orchestrator pod."""

    @classmethod
    def get_entrypoint_options(cls) -> Set[str]:
        """Gets all the options required for running this entrypoint.

        Returns:
            Entrypoint options.
        """
        options = {
            RUN_NAME_OPTION,
            DEPLOYMENT_ID_OPTION,
            NAMESPACE_OPTION,
        }
        return options

    @classmethod
    def get_entrypoint_command(cls) -> List[str]:
        """Returns a command that runs the entrypoint module.

        Returns:
            Entrypoint command.
        """
        command = [
            "python",
            "-m",
            "zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint",
        ]
        return command

    @classmethod
    def get_entrypoint_arguments(
        cls,
        run_name: str,
        deployment_id: "UUID",
        kubernetes_namespace: str,
    ) -> List[str]:
        """Gets all arguments that the entrypoint command should be called with.

        Args:
            run_name: Name of the ZenML run.
            deployment_id: ID of the deployment.
            kubernetes_namespace: Name of the Kubernetes namespace.

        Returns:
            List of entrypoint arguments.
        """
        args = [
            f"--{RUN_NAME_OPTION}",
            run_name,
            f"--{DEPLOYMENT_ID_OPTION}",
            str(deployment_id),
            f"--{NAMESPACE_OPTION}",
            kubernetes_namespace,
        ]

        return args
get_entrypoint_arguments(run_name, deployment_id, kubernetes_namespace) classmethod

Gets all arguments that the entrypoint command should be called with.

Parameters:

Name Type Description Default
run_name str

Name of the ZenML run.

required
deployment_id UUID

ID of the deployment.

required
kubernetes_namespace str

Name of the Kubernetes namespace.

required

Returns:

Type Description
List[str]

List of entrypoint arguments.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
    cls,
    run_name: str,
    deployment_id: "UUID",
    kubernetes_namespace: str,
) -> List[str]:
    """Gets all arguments that the entrypoint command should be called with.

    Args:
        run_name: Name of the ZenML run.
        deployment_id: ID of the deployment.
        kubernetes_namespace: Name of the Kubernetes namespace.

    Returns:
        List of entrypoint arguments.
    """
    args = [
        f"--{RUN_NAME_OPTION}",
        run_name,
        f"--{DEPLOYMENT_ID_OPTION}",
        str(deployment_id),
        f"--{NAMESPACE_OPTION}",
        kubernetes_namespace,
    ]

    return args
get_entrypoint_command() classmethod

Returns a command that runs the entrypoint module.

Returns:

Type Description
List[str]

Entrypoint command.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_command(cls) -> List[str]:
    """Returns a command that runs the entrypoint module.

    Returns:
        Entrypoint command.
    """
    command = [
        "python",
        "-m",
        "zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint",
    ]
    return command
get_entrypoint_options() classmethod

Gets all the options required for running this entrypoint.

Returns:

Type Description
Set[str]

Entrypoint options.

Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
    """Gets all the options required for running this entrypoint.

    Returns:
        Entrypoint options.
    """
    options = {
        RUN_NAME_OPTION,
        DEPLOYMENT_ID_OPTION,
        NAMESPACE_OPTION,
    }
    return options

manifest_utils

Utility functions for building manifests for k8s pods.

add_local_stores_mount(pod_spec)

Makes changes in place to the configuration of the pod spec.

Configures mounted volumes for stack components that write to a local path.

Parameters:

Name Type Description Default
pod_spec V1PodSpec

The pod spec to update.

required
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def add_local_stores_mount(
    pod_spec: k8s_client.V1PodSpec,
) -> None:
    """Makes changes in place to the configuration of the pod spec.

    Configures mounted volumes for stack components that write to a local
    path.

    Args:
        pod_spec: The pod spec to update.
    """
    assert len(pod_spec.containers) == 1
    container_spec: k8s_client.V1Container = pod_spec.containers[0]

    stack = Client().active_stack

    stack.check_local_paths()

    local_stores_path = GlobalConfiguration().local_stores_path

    host_path = k8s_client.V1HostPathVolumeSource(
        path=local_stores_path, type="Directory"
    )

    pod_spec.volumes = pod_spec.volumes or []
    pod_spec.volumes.append(
        k8s_client.V1Volume(
            name="local-stores",
            host_path=host_path,
        )
    )
    container_spec.volume_mounts = container_spec.volume_mounts or []
    container_spec.volume_mounts.append(
        k8s_client.V1VolumeMount(
            name="local-stores",
            mount_path=local_stores_path,
        )
    )

    if sys.platform == "win32":
        # File permissions are not checked on Windows. This if clause
        # prevents mypy from complaining about unused 'type: ignore'
        # statements
        pass
    else:
        # Run KFP containers in the context of the local UID/GID
        # to ensure that the artifact and metadata stores can be shared
        # with the local pipeline runs.
        pod_spec.security_context = k8s_client.V1SecurityContext(
            run_as_user=os.getuid(),
            run_as_group=os.getgid(),
        )

    container_spec.env = container_spec.env or []
    container_spec.env.append(
        k8s_client.V1EnvVar(
            name=ENV_ZENML_LOCAL_STORES_PATH,
            value=local_stores_path,
        )
    )
add_pod_settings(pod_spec, settings)

Updates pod spec fields in place if passed in orchestrator settings.

Parameters:

Name Type Description Default
pod_spec V1PodSpec

Pod spec to update.

required
settings KubernetesPodSettings

Pod settings to apply.

required
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def add_pod_settings(
    pod_spec: k8s_client.V1PodSpec,
    settings: KubernetesPodSettings,
) -> None:
    """Updates pod `spec` fields in place if passed in orchestrator settings.

    Args:
        pod_spec: Pod spec to update.
        settings: Pod settings to apply.
    """
    if settings.node_selectors:
        pod_spec.node_selector = settings.node_selectors

    if settings.affinity:
        pod_spec.affinity = settings.affinity

    if settings.tolerations:
        pod_spec.tolerations = settings.tolerations
build_cluster_role_binding_manifest_for_service_account(name, role_name, service_account_name, namespace='default')

Build a manifest for a cluster role binding of a service account.

Parameters:

Name Type Description Default
name str

Name of the cluster role binding.

required
role_name str

Name of the role.

required
service_account_name str

Name of the service account.

required
namespace str

Kubernetes namespace. Defaults to "default".

'default'

Returns:

Type Description
Dict[str, Any]

Manifest for a cluster role binding of a service account.

Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_cluster_role_binding_manifest_for_service_account(
    name: str,
    role_name: str,
    service_account_name: str,
    namespace: str = "default",
) -> Dict[str, Any]:
    """Build a manifest for a cluster role binding of a service account.

    Args:
        name: Name of the cluster role binding.
        role_name: Name of the role.
        service_account_name: Name of the service account.
        namespace: Kubernetes namespace. Defaults to "default".

    Returns:
        Manifest for a cluster role binding of a service account.
    """
    return {
        "apiVersion": "rbac.authorization.k8s.io/v1",
        "kind": "ClusterRoleBinding",
        "metadata": {"name": name},
        "subjects": [
            {
                "kind": "ServiceAccount",
                "name": service_account_name,
                "namespace": namespace,
            }
        ],
        "roleRef": {
            "kind": "ClusterRole",
            "name": role_name,
            "apiGroup": "rbac.authorization.k8s.io",
        },
    }
build_cron_job_manifest(cron_expression, pod_name, run_name, pipeline_name, image_name, command, args, settings, service_account_name=None, mount_local_stores=False)

Create a manifest for launching a pod as scheduled CRON job.

Parameters:

Name Type Description Default
cron_expression str

CRON job schedule expression, e.g. " * * *".

required
pod_name str

Name of the pod.

required
run_name str

Name of the ZenML run.

required
pipeline_name str

Name of the ZenML pipeline.

required
image_name str

Name of the Docker image.

required
command List[str]

Command to execute the entrypoint in the pod.

required
args List[str]

Arguments provided to the entrypoint command.

required
settings KubernetesOrchestratorSettings

KubernetesOrchestratorSettings object

required
service_account_name Optional[str]

Optional name of a service account. Can be used to assign certain roles to a pod, e.g., to allow it to run Kubernetes commands from within the cluster.

None
mount_local_stores bool

Whether to mount the local stores path inside the pod.

False

Returns:

Type Description
V1beta1CronJob

CRON job manifest.

Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_cron_job_manifest(
    cron_expression: str,
    pod_name: str,
    run_name: str,
    pipeline_name: str,
    image_name: str,
    command: List[str],
    args: List[str],
    settings: KubernetesOrchestratorSettings,
    service_account_name: Optional[str] = None,
    mount_local_stores: bool = False,
) -> k8s_client.V1beta1CronJob:
    """Create a manifest for launching a pod as scheduled CRON job.

    Args:
        cron_expression: CRON job schedule expression, e.g. "* * * * *".
        pod_name: Name of the pod.
        run_name: Name of the ZenML run.
        pipeline_name: Name of the ZenML pipeline.
        image_name: Name of the Docker image.
        command: Command to execute the entrypoint in the pod.
        args: Arguments provided to the entrypoint command.
        settings: `KubernetesOrchestratorSettings` object
        service_account_name: Optional name of a service account.
            Can be used to assign certain roles to a pod, e.g., to allow it to
            run Kubernetes commands from within the cluster.
        mount_local_stores: Whether to mount the local stores path inside the
            pod.

    Returns:
        CRON job manifest.
    """
    pod_manifest = build_pod_manifest(
        pod_name=pod_name,
        run_name=run_name,
        pipeline_name=pipeline_name,
        image_name=image_name,
        command=command,
        args=args,
        settings=settings,
        service_account_name=service_account_name,
        mount_local_stores=mount_local_stores,
    )

    job_spec = k8s_client.V1beta1CronJobSpec(
        schedule=cron_expression,
        job_template=k8s_client.V1beta1JobTemplateSpec(
            metadata=pod_manifest["metadata"],
            spec=k8s_client.V1JobSpec(
                template=k8s_client.V1PodTemplateSpec(
                    metadata=pod_manifest["metadata"],
                    spec=pod_manifest["spec"],
                ),
            ),
        ),
    )

    job_manifest = k8s_client.V1beta1CronJob(
        kind="CronJob",
        api_version="batch/v1beta1",
        metadata=pod_manifest["metadata"],
        spec=job_spec,
    )

    return job_manifest
build_namespace_manifest(namespace)

Build the manifest for a new namespace.

Parameters:

Name Type Description Default
namespace str

Kubernetes namespace.

required

Returns:

Type Description
Dict[str, Any]

Manifest of the new namespace.

Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_namespace_manifest(namespace: str) -> Dict[str, Any]:
    """Build the manifest for a new namespace.

    Args:
        namespace: Kubernetes namespace.

    Returns:
        Manifest of the new namespace.
    """
    return {
        "apiVersion": "v1",
        "kind": "Namespace",
        "metadata": {
            "name": namespace,
        },
    }
build_pod_manifest(pod_name, run_name, pipeline_name, image_name, command, args, settings, service_account_name=None, env=None, mount_local_stores=False)

Build a Kubernetes pod manifest for a ZenML run or step.

Parameters:

Name Type Description Default
pod_name str

Name of the pod.

required
run_name str

Name of the ZenML run.

required
pipeline_name str

Name of the ZenML pipeline.

required
image_name str

Name of the Docker image.

required
command List[str]

Command to execute the entrypoint in the pod.

required
args List[str]

Arguments provided to the entrypoint command.

required
settings KubernetesOrchestratorSettings

KubernetesOrchestratorSettings object

required
service_account_name Optional[str]

Optional name of a service account. Can be used to assign certain roles to a pod, e.g., to allow it to run Kubernetes commands from within the cluster.

None
env Optional[Dict[str, str]]

Environment variables to set.

None
mount_local_stores bool

Whether to mount the local stores path inside the pod.

False

Returns:

Type Description
V1Pod

Pod manifest.

Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_pod_manifest(
    pod_name: str,
    run_name: str,
    pipeline_name: str,
    image_name: str,
    command: List[str],
    args: List[str],
    settings: KubernetesOrchestratorSettings,
    service_account_name: Optional[str] = None,
    env: Optional[Dict[str, str]] = None,
    mount_local_stores: bool = False,
) -> k8s_client.V1Pod:
    """Build a Kubernetes pod manifest for a ZenML run or step.

    Args:
        pod_name: Name of the pod.
        run_name: Name of the ZenML run.
        pipeline_name: Name of the ZenML pipeline.
        image_name: Name of the Docker image.
        command: Command to execute the entrypoint in the pod.
        args: Arguments provided to the entrypoint command.
        settings: `KubernetesOrchestratorSettings` object
        service_account_name: Optional name of a service account.
            Can be used to assign certain roles to a pod, e.g., to allow it to
            run Kubernetes commands from within the cluster.
        env: Environment variables to set.
        mount_local_stores: Whether to mount the local stores path inside the
            pod.

    Returns:
        Pod manifest.
    """
    env = env.copy() if env else {}
    env.setdefault(ENV_ZENML_ENABLE_REPO_INIT_WARNINGS, "False")

    container_spec = k8s_client.V1Container(
        name="main",
        image=image_name,
        command=command,
        args=args,
        env=[
            k8s_client.V1EnvVar(name=name, value=value)
            for name, value in env.items()
        ],
    )

    pod_spec = k8s_client.V1PodSpec(
        containers=[container_spec],
        restart_policy="Never",
    )

    if service_account_name is not None:
        pod_spec.service_account_name = service_account_name

    if settings.pod_settings:
        add_pod_settings(pod_spec, settings.pod_settings)

    pod_metadata = k8s_client.V1ObjectMeta(
        name=pod_name,
        labels={
            "run": run_name,
            "pipeline": pipeline_name,
        },
    )

    pod_manifest = k8s_client.V1Pod(
        kind="Pod",
        api_version="v1",
        metadata=pod_metadata,
        spec=pod_spec,
    )

    if mount_local_stores:
        add_local_stores_mount(pod_spec)

    return pod_manifest
build_service_account_manifest(name, namespace='default')

Build the manifest for a service account.

Parameters:

Name Type Description Default
name str

Name of the service account.

required
namespace str

Kubernetes namespace. Defaults to "default".

'default'

Returns:

Type Description
Dict[str, Any]

Manifest for a service account.

Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_service_account_manifest(
    name: str, namespace: str = "default"
) -> Dict[str, Any]:
    """Build the manifest for a service account.

    Args:
        name: Name of the service account.
        namespace: Kubernetes namespace. Defaults to "default".

    Returns:
        Manifest for a service account.
    """
    return {
        "apiVersion": "v1",
        "metadata": {
            "name": name,
            "namespace": namespace,
        },
    }

pod_settings

Kubernetes pod settings.

KubernetesPodSettings (BaseSettings) pydantic-model

Kubernetes Pod settings.

Attributes:

Name Type Description
node_selectors Dict[str, str]

Node selectors to apply to the pod.

affinity Dict[str, Any]

Affinity to apply to the pod.

tolerations List[Dict[str, Any]]

Tolerations to apply to the pod.

resources Dict[str, Dict[str, str]]

Resource requests and limits for the pod.

Source code in zenml/integrations/kubernetes/pod_settings.py
class KubernetesPodSettings(BaseSettings):
    """Kubernetes Pod settings.

    Attributes:
        node_selectors: Node selectors to apply to the pod.
        affinity: Affinity to apply to the pod.
        tolerations: Tolerations to apply to the pod.
        resources: Resource requests and limits for the pod.
    """

    node_selectors: Dict[str, str] = {}
    affinity: Dict[str, Any] = {}
    tolerations: List[Dict[str, Any]] = []
    resources: Dict[str, Dict[str, str]] = {}

    @validator("affinity", pre=True)
    def _convert_affinity(
        cls, value: Union[Dict[str, Any], "V1Affinity"]
    ) -> Dict[str, Any]:
        """Converts Kubernetes affinity to a dict.

        Args:
            value: The affinity value.

        Returns:
            The converted value.
        """
        from kubernetes.client.models import V1Affinity

        if isinstance(value, V1Affinity):
            return serialization_utils.serialize_kubernetes_model(value)
        else:
            return value

    @validator("tolerations", pre=True)
    def _convert_tolerations(
        cls, value: List[Union[Dict[str, Any], "V1Toleration"]]
    ) -> List[Dict[str, Any]]:
        """Converts Kubernetes tolerations to dicts.

        Args:
            value: The tolerations list.

        Returns:
            The converted tolerations.
        """
        from kubernetes.client.models import V1Toleration

        result = []
        for element in value:
            if isinstance(element, V1Toleration):
                result.append(
                    serialization_utils.serialize_kubernetes_model(element)
                )
            else:
                result.append(element)

        return result

    @validator("resources", pre=True)
    def _convert_resources(
        cls, value: Union[Dict[str, Any], "V1ResourceRequirements"]
    ) -> Dict[str, Any]:
        """Converts Kubernetes resource requirements to a dict.

        Args:
            value: The resource value.

        Returns:
            The converted value.
        """
        from kubernetes.client.models import V1ResourceRequirements

        if isinstance(value, V1ResourceRequirements):
            return serialization_utils.serialize_kubernetes_model(value)
        else:
            return value

serialization_utils

Kubernetes serialization utils.

deserialize_kubernetes_model(data, class_name)

Deserializes a Kubernetes model.

Parameters:

Name Type Description Default
data Dict[str, Any]

The model data.

required
class_name str

Name of the Kubernetes model class.

required

Exceptions:

Type Description
KeyError

If the data contains values for an invalid attribute.

Returns:

Type Description
Any

The deserialized model.

Source code in zenml/integrations/kubernetes/serialization_utils.py
def deserialize_kubernetes_model(data: Dict[str, Any], class_name: str) -> Any:
    """Deserializes a Kubernetes model.

    Args:
        data: The model data.
        class_name: Name of the Kubernetes model class.

    Raises:
        KeyError: If the data contains values for an invalid attribute.

    Returns:
        The deserialized model.
    """
    model_class = get_model_class(class_name=class_name)
    assert hasattr(model_class, "openapi_types")
    assert hasattr(model_class, "attribute_map")
    # Mapping of the attribute name of the model class to the attribute type
    type_mapping = cast(Dict[str, str], model_class.openapi_types)
    reverse_attribute_mapping = cast(Dict[str, str], model_class.attribute_map)
    # Mapping of the serialized key to the attribute name of the model class
    attribute_mapping = {
        value: key for key, value in reverse_attribute_mapping.items()
    }

    deserialized_attributes: Dict[str, Any] = {}

    for key, value in data.items():
        if key not in attribute_mapping:
            raise KeyError(
                f"Got value for attribute {key} which is not one of the "
                f"available attributes {set(attribute_mapping)}."
            )

        attribute_name = attribute_mapping[key]
        attribute_class = type_mapping[attribute_name]

        if not value:
            deserialized_attributes[attribute_name] = value
        elif attribute_class.startswith("list["):
            match = re.fullmatch(r"list\[(.*)\]", attribute_class)
            assert match
            inner_class = match.group(1)
            deserialized_attributes[attribute_name] = _deserialize_list(
                value, class_name=inner_class
            )
        elif attribute_class.startswith("dict("):
            match = re.fullmatch(r"dict\(([^,]*), (.*)\)", attribute_class)
            assert match
            inner_class = match.group(1)
            deserialized_attributes[attribute_name] = _deserialize_dict(
                value, class_name=inner_class
            )
        elif is_model_class(attribute_class):
            deserialized_attributes[
                attribute_name
            ] = deserialize_kubernetes_model(value, attribute_class)
        else:
            deserialized_attributes[attribute_name] = value

    return model_class(**deserialized_attributes)

get_model_class(class_name)

Gets a Kubernetes model class.

Parameters:

Name Type Description Default
class_name str

Name of the class to get.

required

Exceptions:

Type Description
TypeError

If no Kubernetes model class exists for this name.

Returns:

Type Description
Type[Any]

The model class.

Source code in zenml/integrations/kubernetes/serialization_utils.py
def get_model_class(class_name: str) -> Type[Any]:
    """Gets a Kubernetes model class.

    Args:
        class_name: Name of the class to get.

    Raises:
        TypeError: If no Kubernetes model class exists for this name.

    Returns:
        The model class.
    """
    import kubernetes.client.models

    class_ = getattr(kubernetes.client.models, class_name, None)

    if not class_:
        raise TypeError(
            f"Unable to find kubernetes model class with name {class_name}."
        )

    assert isinstance(class_, type)
    return class_

is_model_class(class_name)

Checks whether the given class name is a Kubernetes model class.

Parameters:

Name Type Description Default
class_name str

Name of the class to check.

required

Returns:

Type Description
bool

If the given class name is a Kubernetes model class.

Source code in zenml/integrations/kubernetes/serialization_utils.py
def is_model_class(class_name: str) -> bool:
    """Checks whether the given class name is a Kubernetes model class.

    Args:
        class_name: Name of the class to check.

    Returns:
        If the given class name is a Kubernetes model class.
    """
    import kubernetes.client.models

    return hasattr(kubernetes.client.models, class_name)

serialize_kubernetes_model(model)

Serializes a Kubernetes model.

Parameters:

Name Type Description Default
model Any

The model to serialize.

required

Exceptions:

Type Description
TypeError

If the model is not a Kubernetes model.

Returns:

Type Description
Dict[str, Any]

The serialized model.

Source code in zenml/integrations/kubernetes/serialization_utils.py
def serialize_kubernetes_model(model: Any) -> Dict[str, Any]:
    """Serializes a Kubernetes model.

    Args:
        model: The model to serialize.

    Raises:
        TypeError: If the model is not a Kubernetes model.

    Returns:
        The serialized model.
    """
    if not is_model_class(model.__class__.__name__):
        raise TypeError(f"Unable to serialize non-kubernetes model {model}.")

    assert hasattr(model, "attribute_map")
    attribute_mapping = cast(Dict[str, str], model.attribute_map)

    model_attributes = {
        serialized_attribute_name: getattr(model, attribute_name)
        for attribute_name, serialized_attribute_name in attribute_mapping.items()
    }
    return _serialize_dict(model_attributes)