Kubernetes
zenml.integrations.kubernetes
special
Kubernetes integration for Kubernetes-native orchestration.
The Kubernetes integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Kubernetes orchestrator with the CLI tool.
KubernetesIntegration (Integration)
Definition of Kubernetes integration for ZenML.
Source code in zenml/integrations/kubernetes/__init__.py
class KubernetesIntegration(Integration):
"""Definition of Kubernetes integration for ZenML."""
NAME = KUBERNETES
REQUIREMENTS = ["kubernetes==18.20.0"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Kubernetes integration.
Returns:
List of new stack component flavors.
"""
from zenml.integrations.kubernetes.flavors import (
KubernetesOrchestratorFlavor,
)
return [KubernetesOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Kubernetes integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of new stack component flavors. |
Source code in zenml/integrations/kubernetes/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Kubernetes integration.
Returns:
List of new stack component flavors.
"""
from zenml.integrations.kubernetes.flavors import (
KubernetesOrchestratorFlavor,
)
return [KubernetesOrchestratorFlavor]
flavors
special
Kubernetes integration flavors.
kubernetes_orchestrator_flavor
Kubernetes orchestrator flavor.
KubernetesOrchestratorConfig (BaseOrchestratorConfig, KubernetesOrchestratorSettings)
pydantic-model
Configuration for the Kubernetes orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
kubernetes_context |
Optional[str] |
Optional name of a Kubernetes context to run
pipelines in. If not set, the current active context will be used.
You can find the active context by running |
kubernetes_namespace |
str |
Name of the Kubernetes namespace to be used.
If not provided, |
skip_config_loading |
bool |
If |
Source code in zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py
class KubernetesOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseOrchestratorConfig, KubernetesOrchestratorSettings
):
"""Configuration for the Kubernetes orchestrator.
Attributes:
kubernetes_context: Optional name of a Kubernetes context to run
pipelines in. If not set, the current active context will be used.
You can find the active context by running `kubectl config
current-context`.
kubernetes_namespace: Name of the Kubernetes namespace to be used.
If not provided, `zenml` namespace will be used.
skip_config_loading: If `True`, don't load the Kubernetes context and
clients. This is only useful for unit testing.
"""
kubernetes_context: Optional[str] = None # TODO: Potential setting
kubernetes_namespace: str = "zenml"
skip_config_loading: bool = False # TODO: Remove?
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return True
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
KubernetesOrchestratorFlavor (BaseOrchestratorFlavor)
Kubernetes orchestrator flavor.
Source code in zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py
class KubernetesOrchestratorFlavor(BaseOrchestratorFlavor):
"""Kubernetes orchestrator flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return KUBERNETES_ORCHESTRATOR_FLAVOR
@property
def config_class(self) -> Type[KubernetesOrchestratorConfig]:
"""Returns `KubernetesOrchestratorConfig` config class.
Returns:
The config class.
"""
return KubernetesOrchestratorConfig
@property
def implementation_class(self) -> Type["KubernetesOrchestrator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.kubernetes.orchestrators import (
KubernetesOrchestrator,
)
return KubernetesOrchestrator
config_class: Type[zenml.integrations.kubernetes.flavors.kubernetes_orchestrator_flavor.KubernetesOrchestratorConfig]
property
readonly
Returns KubernetesOrchestratorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.kubernetes.flavors.kubernetes_orchestrator_flavor.KubernetesOrchestratorConfig] |
The config class. |
implementation_class: Type[KubernetesOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[KubernetesOrchestrator] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
KubernetesOrchestratorSettings (BaseSettings)
pydantic-model
Settings for the Kubernetes orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
synchronous |
bool |
If |
timeout |
int |
How many seconds to wait for synchronous runs. |
pod_settings |
Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings] |
Pod settings to apply. |
Source code in zenml/integrations/kubernetes/flavors/kubernetes_orchestrator_flavor.py
class KubernetesOrchestratorSettings(BaseSettings):
"""Settings for the Kubernetes orchestrator.
Attributes:
synchronous: If `True`, running a pipeline using this orchestrator will
block until all steps finished running on Kubernetes.
timeout: How many seconds to wait for synchronous runs. `0` means
to wait for an unlimited duration.
pod_settings: Pod settings to apply.
"""
synchronous: bool = False
timeout: int = 0
pod_settings: Optional[KubernetesPodSettings] = None
orchestrators
special
Kubernetes-native orchestration.
dag_runner
DAG (Directed Acyclic Graph) Runners.
NodeStatus (Enum)
Status of the execution of a node.
Source code in zenml/integrations/kubernetes/orchestrators/dag_runner.py
class NodeStatus(Enum):
"""Status of the execution of a node."""
WAITING = "Waiting"
RUNNING = "Running"
COMPLETED = "Completed"
ThreadedDagRunner
Multi-threaded DAG Runner.
This class expects a DAG of strings in adjacency list representation, as
well as a custom run_fn
as input, then calls run_fn(node)
for each
string node in the DAG.
Steps that can be executed in parallel will be started in separate threads.
Source code in zenml/integrations/kubernetes/orchestrators/dag_runner.py
class ThreadedDagRunner:
"""Multi-threaded DAG Runner.
This class expects a DAG of strings in adjacency list representation, as
well as a custom `run_fn` as input, then calls `run_fn(node)` for each
string node in the DAG.
Steps that can be executed in parallel will be started in separate threads.
"""
def __init__(
self, dag: Dict[str, List[str]], run_fn: Callable[[str], Any]
) -> None:
"""Define attributes and initialize all nodes in waiting state.
Args:
dag: Adjacency list representation of a DAG.
E.g.: [(1->2), (1->3), (2->4), (3->4)] should be represented as
`dag={2: [1], 3: [1], 4: [2, 3]}`
run_fn: A function `run_fn(node)` that runs a single node
"""
self.dag = dag
self.reversed_dag = reverse_dag(dag)
self.run_fn = run_fn
self.nodes = dag.keys()
self.node_states = {node: NodeStatus.WAITING for node in self.nodes}
self._lock = threading.Lock()
def _can_run(self, node: str) -> bool:
"""Determine whether a node is ready to be run.
This is the case if the node has not run yet and all of its upstream
node have already completed.
Args:
node: The node.
Returns:
True if the node can run else False.
"""
# Check that node has not run yet.
if not self.node_states[node] == NodeStatus.WAITING:
return False
# Check that all upstream nodes of this node have already completed.
for upstream_node in self.dag[node]:
if not self.node_states[upstream_node] == NodeStatus.COMPLETED:
return False
return True
def _run_node(self, node: str) -> None:
"""Run a single node.
Calls the user-defined run_fn, then calls `self._finish_node`.
Args:
node: The node.
"""
self.run_fn(node)
self._finish_node(node)
def _run_node_in_thread(self, node: str) -> threading.Thread:
"""Run a single node in a separate thread.
First updates the node status to running.
Then calls self._run_node() in a new thread and returns the thread.
Args:
node: The node.
Returns:
The thread in which the node was run.
"""
# Update node status to running.
assert self.node_states[node] == NodeStatus.WAITING
with self._lock:
self.node_states[node] = NodeStatus.RUNNING
# Run node in new thread.
thread = threading.Thread(target=self._run_node, args=(node,))
thread.start()
return thread
def _finish_node(self, node: str) -> None:
"""Finish a node run.
First updates the node status to completed.
Then starts all other nodes that can now be run and waits for them.
Args:
node: The node.
"""
# Update node status to completed.
assert self.node_states[node] == NodeStatus.RUNNING
with self._lock:
self.node_states[node] = NodeStatus.COMPLETED
# Run downstream nodes.
threads = []
for downstram_node in self.reversed_dag[node]:
if self._can_run(downstram_node):
thread = self._run_node_in_thread(downstram_node)
threads.append(thread)
# Wait for all downstream nodes to complete.
for thread in threads:
thread.join()
def run(self) -> None:
"""Call `self.run_fn` on all nodes in `self.dag`.
The order of execution is determined using topological sort.
Each node is run in a separate thread to enable parallelism.
"""
# Run all nodes that can be started immediately.
# These will, in turn, start other nodes once all of their respective
# upstream nodes have completed.
threads = []
for node in self.nodes:
if self._can_run(node):
thread = self._run_node_in_thread(node)
threads.append(thread)
# Wait till all nodes have completed.
for thread in threads:
thread.join()
# Make sure all nodes were run, otherwise print a warning.
for node in self.nodes:
if self.node_states[node] == NodeStatus.WAITING:
upstream_nodes = self.dag[node]
logger.warning(
f"Node `{node}` was never run, because it was still"
f" waiting for the following nodes: `{upstream_nodes}`."
)
__init__(self, dag, run_fn)
special
Define attributes and initialize all nodes in waiting state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag |
Dict[str, List[str]] |
Adjacency list representation of a DAG.
E.g.: [(1->2), (1->3), (2->4), (3->4)] should be represented as
|
required |
run_fn |
Callable[[str], Any] |
A function |
required |
Source code in zenml/integrations/kubernetes/orchestrators/dag_runner.py
def __init__(
self, dag: Dict[str, List[str]], run_fn: Callable[[str], Any]
) -> None:
"""Define attributes and initialize all nodes in waiting state.
Args:
dag: Adjacency list representation of a DAG.
E.g.: [(1->2), (1->3), (2->4), (3->4)] should be represented as
`dag={2: [1], 3: [1], 4: [2, 3]}`
run_fn: A function `run_fn(node)` that runs a single node
"""
self.dag = dag
self.reversed_dag = reverse_dag(dag)
self.run_fn = run_fn
self.nodes = dag.keys()
self.node_states = {node: NodeStatus.WAITING for node in self.nodes}
self._lock = threading.Lock()
run(self)
Call self.run_fn
on all nodes in self.dag
.
The order of execution is determined using topological sort. Each node is run in a separate thread to enable parallelism.
Source code in zenml/integrations/kubernetes/orchestrators/dag_runner.py
def run(self) -> None:
"""Call `self.run_fn` on all nodes in `self.dag`.
The order of execution is determined using topological sort.
Each node is run in a separate thread to enable parallelism.
"""
# Run all nodes that can be started immediately.
# These will, in turn, start other nodes once all of their respective
# upstream nodes have completed.
threads = []
for node in self.nodes:
if self._can_run(node):
thread = self._run_node_in_thread(node)
threads.append(thread)
# Wait till all nodes have completed.
for thread in threads:
thread.join()
# Make sure all nodes were run, otherwise print a warning.
for node in self.nodes:
if self.node_states[node] == NodeStatus.WAITING:
upstream_nodes = self.dag[node]
logger.warning(
f"Node `{node}` was never run, because it was still"
f" waiting for the following nodes: `{upstream_nodes}`."
)
reverse_dag(dag)
Reverse a DAG.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag |
Dict[str, List[str]] |
Adjacency list representation of a DAG. |
required |
Returns:
Type | Description |
---|---|
Dict[str, List[str]] |
Adjacency list representation of the reversed DAG. |
Source code in zenml/integrations/kubernetes/orchestrators/dag_runner.py
def reverse_dag(dag: Dict[str, List[str]]) -> Dict[str, List[str]]:
"""Reverse a DAG.
Args:
dag: Adjacency list representation of a DAG.
Returns:
Adjacency list representation of the reversed DAG.
"""
reversed_dag = defaultdict(list)
# Reverse all edges in the graph.
for node, upstream_nodes in dag.items():
for upstream_node in upstream_nodes:
reversed_dag[upstream_node].append(node)
# Add nodes without incoming edges back in.
for node in dag:
if node not in reversed_dag:
reversed_dag[node] = []
return reversed_dag
kube_utils
Utilities for Kubernetes related functions.
Internal interface: no backwards compatibility guarantees. Adjusted from https://github.com/tensorflow/tfx/blob/master/tfx/utils/kube_utils.py.
PodPhase (Enum)
Phase of the Kubernetes pod.
Pod phases are defined in https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
class PodPhase(enum.Enum):
"""Phase of the Kubernetes pod.
Pod phases are defined in
https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
"""
PENDING = "Pending"
RUNNING = "Running"
SUCCEEDED = "Succeeded"
FAILED = "Failed"
UNKNOWN = "Unknown"
create_edit_service_account(core_api, rbac_api, service_account_name, namespace, cluster_role_binding_name='zenml-edit')
Create a new Kubernetes service account with "edit" rights.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
core_api |
CoreV1Api |
Client of Core V1 API of Kubernetes API. |
required |
rbac_api |
RbacAuthorizationV1Api |
Client of Rbac Authorization V1 API of Kubernetes API. |
required |
service_account_name |
str |
Name of the service account. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
required |
cluster_role_binding_name |
str |
Name of the cluster role binding. Defaults to "zenml-edit". |
'zenml-edit' |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def create_edit_service_account(
core_api: k8s_client.CoreV1Api,
rbac_api: k8s_client.RbacAuthorizationV1Api,
service_account_name: str,
namespace: str,
cluster_role_binding_name: str = "zenml-edit",
) -> None:
"""Create a new Kubernetes service account with "edit" rights.
Args:
core_api: Client of Core V1 API of Kubernetes API.
rbac_api: Client of Rbac Authorization V1 API of Kubernetes API.
service_account_name: Name of the service account.
namespace: Kubernetes namespace. Defaults to "default".
cluster_role_binding_name: Name of the cluster role binding.
Defaults to "zenml-edit".
"""
crb_manifest = build_cluster_role_binding_manifest_for_service_account(
name=cluster_role_binding_name,
role_name="edit",
service_account_name=service_account_name,
namespace=namespace,
)
_if_not_exists(rbac_api.create_cluster_role_binding)(body=crb_manifest)
sa_manifest = build_service_account_manifest(
name=service_account_name, namespace=namespace
)
_if_not_exists(core_api.create_namespaced_service_account)(
namespace=namespace,
body=sa_manifest,
)
create_mysql_deployment(core_api, apps_api, deployment_name, namespace, storage_capacity='10Gi', volume_name='mysql-pv-volume', volume_claim_name='mysql-pv-claim')
Create a Kubernetes deployment with a MySQL database running on it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
core_api |
CoreV1Api |
Client of Core V1 API of Kubernetes API. |
required |
apps_api |
AppsV1Api |
Client of Apps V1 API of Kubernetes API. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
required |
storage_capacity |
str |
Storage capacity of the database.
Defaults to |
'10Gi' |
deployment_name |
str |
Name of the deployment. Defaults to "mysql". |
required |
volume_name |
str |
Name of the persistent volume.
Defaults to |
'mysql-pv-volume' |
volume_claim_name |
str |
Name of the persistent volume claim.
Defaults to |
'mysql-pv-claim' |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def create_mysql_deployment(
core_api: k8s_client.CoreV1Api,
apps_api: k8s_client.AppsV1Api,
deployment_name: str,
namespace: str,
storage_capacity: str = "10Gi",
volume_name: str = "mysql-pv-volume",
volume_claim_name: str = "mysql-pv-claim",
) -> None:
"""Create a Kubernetes deployment with a MySQL database running on it.
Args:
core_api: Client of Core V1 API of Kubernetes API.
apps_api: Client of Apps V1 API of Kubernetes API.
namespace: Kubernetes namespace. Defaults to "default".
storage_capacity: Storage capacity of the database.
Defaults to `"10Gi"`.
deployment_name: Name of the deployment. Defaults to "mysql".
volume_name: Name of the persistent volume.
Defaults to `"mysql-pv-volume"`.
volume_claim_name: Name of the persistent volume claim.
Defaults to `"mysql-pv-claim"`.
"""
pvc_manifest = build_persistent_volume_claim_manifest(
name=volume_claim_name,
namespace=namespace,
storage_request=storage_capacity,
)
_if_not_exists(core_api.create_namespaced_persistent_volume_claim)(
namespace=namespace,
body=pvc_manifest,
)
pv_manifest = build_persistent_volume_manifest(
name=volume_name, storage_capacity=storage_capacity
)
_if_not_exists(core_api.create_persistent_volume)(body=pv_manifest)
deployment_manifest = build_mysql_deployment_manifest(
name=deployment_name,
namespace=namespace,
pv_claim_name=volume_claim_name,
)
_if_not_exists(apps_api.create_namespaced_deployment)(
body=deployment_manifest, namespace=namespace
)
service_manifest = build_mysql_service_manifest(
name=deployment_name, namespace=namespace
)
_if_not_exists(core_api.create_namespaced_service)(
namespace=namespace, body=service_manifest
)
create_namespace(core_api, namespace)
Create a Kubernetes namespace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
core_api |
CoreV1Api |
Client of Core V1 API of Kubernetes API. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
required |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def create_namespace(core_api: k8s_client.CoreV1Api, namespace: str) -> None:
"""Create a Kubernetes namespace.
Args:
core_api: Client of Core V1 API of Kubernetes API.
namespace: Kubernetes namespace. Defaults to "default".
"""
manifest = build_namespace_manifest(namespace)
_if_not_exists(core_api.create_namespace)(body=manifest)
delete_deployment(apps_api, deployment_name, namespace)
Delete a Kubernetes deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
apps_api |
AppsV1Api |
Client of Apps V1 API of Kubernetes API. |
required |
deployment_name |
str |
Name of the deployment to be deleted. |
required |
namespace |
str |
Kubernetes namespace containing the deployment. |
required |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def delete_deployment(
apps_api: k8s_client.AppsV1Api, deployment_name: str, namespace: str
) -> None:
"""Delete a Kubernetes deployment.
Args:
apps_api: Client of Apps V1 API of Kubernetes API.
deployment_name: Name of the deployment to be deleted.
namespace: Kubernetes namespace containing the deployment.
"""
options = k8s_client.V1DeleteOptions()
apps_api.delete_namespaced_deployment(
name=deployment_name,
namespace=namespace,
body=options,
propagation_policy="Foreground",
)
get_pod(core_api, pod_name, namespace)
Get a pod from Kubernetes metadata API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
core_api |
CoreV1Api |
Client of |
required |
pod_name |
str |
The name of the pod. |
required |
namespace |
str |
The namespace of the pod. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
When it sees unexpected errors from Kubernetes API. |
Returns:
Type | Description |
---|---|
Optional[kubernetes.client.models.v1_pod.V1Pod] |
The found pod object. None if it's not found. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def get_pod(
core_api: k8s_client.CoreV1Api, pod_name: str, namespace: str
) -> Optional[k8s_client.V1Pod]:
"""Get a pod from Kubernetes metadata API.
Args:
core_api: Client of `CoreV1Api` of Kubernetes API.
pod_name: The name of the pod.
namespace: The namespace of the pod.
Raises:
RuntimeError: When it sees unexpected errors from Kubernetes API.
Returns:
The found pod object. None if it's not found.
"""
try:
return core_api.read_namespaced_pod(name=pod_name, namespace=namespace)
except k8s_client.rest.ApiException as e:
if e.status == 404:
return None
raise RuntimeError from e
is_inside_kubernetes()
Check whether we are inside a Kubernetes cluster or on a remote host.
Returns:
Type | Description |
---|---|
bool |
True if inside a Kubernetes cluster, else False. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def is_inside_kubernetes() -> bool:
"""Check whether we are inside a Kubernetes cluster or on a remote host.
Returns:
True if inside a Kubernetes cluster, else False.
"""
try:
k8s_config.load_incluster_config()
return True
except k8s_config.ConfigException:
return False
load_kube_config(context=None)
Load the Kubernetes client config.
Depending on the environment (whether it is inside the running Kubernetes cluster or remote host), different location will be searched for the config file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
Optional[str] |
Name of the Kubernetes context. If not provided, uses the currently active context. |
None |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def load_kube_config(context: Optional[str] = None) -> None:
"""Load the Kubernetes client config.
Depending on the environment (whether it is inside the running Kubernetes
cluster or remote host), different location will be searched for the config
file.
Args:
context: Name of the Kubernetes context. If not provided, uses the
currently active context.
"""
try:
k8s_config.load_incluster_config()
except k8s_config.ConfigException:
k8s_config.load_kube_config(context=context)
pod_failed(pod)
Check if pod status is 'Failed'.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod |
V1Pod |
Kubernetes pod. |
required |
Returns:
Type | Description |
---|---|
bool |
True if pod status is 'Failed' else False. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def pod_failed(pod: k8s_client.V1Pod) -> bool:
"""Check if pod status is 'Failed'.
Args:
pod: Kubernetes pod.
Returns:
True if pod status is 'Failed' else False.
"""
return pod.status.phase == PodPhase.FAILED.value # type: ignore[no-any-return]
pod_is_done(pod)
Check if pod status is 'Succeeded'.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod |
V1Pod |
Kubernetes pod. |
required |
Returns:
Type | Description |
---|---|
bool |
True if pod status is 'Succeeded' else False. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def pod_is_done(pod: k8s_client.V1Pod) -> bool:
"""Check if pod status is 'Succeeded'.
Args:
pod: Kubernetes pod.
Returns:
True if pod status is 'Succeeded' else False.
"""
return pod.status.phase == PodPhase.SUCCEEDED.value # type: ignore[no-any-return]
pod_is_not_pending(pod)
Check if pod status is not 'Pending'.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod |
V1Pod |
Kubernetes pod. |
required |
Returns:
Type | Description |
---|---|
bool |
False if the pod status is 'Pending' else True. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def pod_is_not_pending(pod: k8s_client.V1Pod) -> bool:
"""Check if pod status is not 'Pending'.
Args:
pod: Kubernetes pod.
Returns:
False if the pod status is 'Pending' else True.
"""
return pod.status.phase != PodPhase.PENDING.value # type: ignore[no-any-return]
sanitize_pod_name(pod_name)
Sanitize pod names so they conform to Kubernetes pod naming convention.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod_name |
str |
Arbitrary input pod name. |
required |
Returns:
Type | Description |
---|---|
str |
Sanitized pod name. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def sanitize_pod_name(pod_name: str) -> str:
"""Sanitize pod names so they conform to Kubernetes pod naming convention.
Args:
pod_name: Arbitrary input pod name.
Returns:
Sanitized pod name.
"""
pod_name = re.sub(r"[^a-z0-9-]", "-", pod_name.lower())
pod_name = re.sub(r"^[-]+", "", pod_name)
return re.sub(r"[-]+", "-", pod_name)
wait_pod(core_api, pod_name, namespace, exit_condition_lambda, timeout_sec=0, exponential_backoff=False, stream_logs=False)
Wait for a pod to meet an exit condition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
core_api |
CoreV1Api |
Client of |
required |
pod_name |
str |
The name of the pod. |
required |
namespace |
str |
The namespace of the pod. |
required |
exit_condition_lambda |
Callable[[kubernetes.client.models.v1_pod.V1Pod], bool] |
A lambda which will be called periodically to wait for a pod to exit. The function returns True to exit. |
required |
timeout_sec |
int |
Timeout in seconds to wait for pod to reach exit condition, or 0 to wait for an unlimited duration. Defaults to unlimited. |
0 |
exponential_backoff |
bool |
Whether to use exponential back off for polling. Defaults to False. |
False |
stream_logs |
bool |
Whether to stream the pod logs to
|
False |
Exceptions:
Type | Description |
---|---|
RuntimeError |
when the function times out. |
Returns:
Type | Description |
---|---|
V1Pod |
The pod object which meets the exit condition. |
Source code in zenml/integrations/kubernetes/orchestrators/kube_utils.py
def wait_pod(
core_api: k8s_client.CoreV1Api,
pod_name: str,
namespace: str,
exit_condition_lambda: Callable[[k8s_client.V1Pod], bool],
timeout_sec: int = 0,
exponential_backoff: bool = False,
stream_logs: bool = False,
) -> k8s_client.V1Pod:
"""Wait for a pod to meet an exit condition.
Args:
core_api: Client of `CoreV1Api` of Kubernetes API.
pod_name: The name of the pod.
namespace: The namespace of the pod.
exit_condition_lambda: A lambda
which will be called periodically to wait for a pod to exit. The
function returns True to exit.
timeout_sec: Timeout in seconds to wait for pod to reach exit
condition, or 0 to wait for an unlimited duration.
Defaults to unlimited.
exponential_backoff: Whether to use exponential back off for polling.
Defaults to False.
stream_logs: Whether to stream the pod logs to
`zenml.logger.info()`. Defaults to False.
Raises:
RuntimeError: when the function times out.
Returns:
The pod object which meets the exit condition.
"""
start_time = datetime.datetime.utcnow()
# Link to exponential back-off algorithm used here:
# https://cloud.google.com/storage/docs/exponential-backoff
backoff_interval = 1
maximum_backoff = 32
logged_lines = 0
while True:
resp = get_pod(core_api, pod_name, namespace)
# Stream logs to `zenml.logger.info()`.
# TODO: can we do this without parsing all logs every time?
if stream_logs and pod_is_not_pending(resp):
response = core_api.read_namespaced_pod_log(
name=pod_name,
namespace=namespace,
)
logs = response.splitlines()
if len(logs) > logged_lines:
for line in logs[logged_lines:]:
logger.info(line)
logged_lines = len(logs)
# Raise an error if the pod failed.
if pod_failed(resp):
raise RuntimeError(f"Pod `{namespace}:{pod_name}` failed.")
# Check if pod is in desired state (e.g. finished / running / ...).
if exit_condition_lambda(resp):
return resp
# Check if wait timed out.
elapse_time = datetime.datetime.utcnow() - start_time
if elapse_time.seconds >= timeout_sec and timeout_sec != 0:
raise RuntimeError(
f"Waiting for pod `{namespace}:{pod_name}` timed out after "
f"{timeout_sec} seconds."
)
# Wait (using exponential backoff).
time.sleep(backoff_interval)
if exponential_backoff and backoff_interval < maximum_backoff:
backoff_interval *= 2
kubernetes_orchestrator
Kubernetes-native orchestrator.
KubernetesOrchestrator (BaseOrchestrator)
Orchestrator for running ZenML pipelines using native Kubernetes.
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
class KubernetesOrchestrator(BaseOrchestrator):
"""Orchestrator for running ZenML pipelines using native Kubernetes."""
_k8s_core_api: k8s_client.CoreV1Api = None
_k8s_batch_api: k8s_client.BatchV1beta1Api = None
_k8s_rbac_api: k8s_client.RbacAuthorizationV1Api = None
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the class and the Kubernetes clients.
Args:
*args: The positional arguments to pass to the Pydantic object.
**kwargs: The keyword arguments to pass to the Pydantic object.
"""
super().__init__(*args, **kwargs)
self._initialize_k8s_clients()
def _initialize_k8s_clients(self) -> None:
"""Initialize the Kubernetes clients."""
if self.config.skip_config_loading:
return
kube_utils.load_kube_config(context=self.config.kubernetes_context)
self._k8s_core_api = k8s_client.CoreV1Api()
self._k8s_batch_api = k8s_client.BatchV1beta1Api()
self._k8s_rbac_api = k8s_client.RbacAuthorizationV1Api()
@property
def config(self) -> KubernetesOrchestratorConfig:
"""Returns the `KubernetesOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(KubernetesOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Kubernetes orchestrator.
Returns:
The settings class.
"""
return KubernetesOrchestratorSettings
def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
"""Get list of configured Kubernetes contexts and the active context.
Raises:
RuntimeError: if the Kubernetes configuration cannot be loaded.
Returns:
context_name: List of configured Kubernetes contexts
active_context_name: Name of the active Kubernetes context.
"""
try:
contexts, active_context = k8s_config.list_kube_config_contexts()
except k8s_config.config_exception.ConfigException as e:
raise RuntimeError(
"Could not load the Kubernetes configuration"
) from e
context_names = [c["name"] for c in contexts]
active_context_name = active_context["name"]
return context_names, active_context_name
@property
def validator(self) -> Optional[StackValidator]:
"""Defines the validator that checks whether the stack is valid.
Returns:
Stack validator.
"""
def _validate_local_requirements(stack: "Stack") -> Tuple[bool, str]:
"""Validates that the stack contains no local components.
Args:
stack: The stack.
Returns:
Whether the stack is valid or not.
An explanation why the stack is invalid, if applicable.
"""
container_registry = stack.container_registry
# should not happen, because the stack validation takes care of
# this, but just in case
assert container_registry is not None
if not self.config.skip_config_loading:
contexts, active_context = self.get_kubernetes_contexts()
if self.config.kubernetes_context not in contexts:
return False, (
f"Could not find a Kubernetes context named "
f"'{self.config.kubernetes_context}' in the local Kubernetes "
f"configuration. Please make sure that the Kubernetes "
f"cluster is running and that the kubeconfig file is "
f"configured correctly. To list all configured "
f"contexts, run:\n\n"
f" `kubectl config get-contexts`\n"
)
if self.config.kubernetes_context != active_context:
logger.warning(
f"The Kubernetes context '{self.config.kubernetes_context}' "
f"configured for the Kubernetes orchestrator is not "
f"the same as the active context in the local "
f"Kubernetes configuration. If this is not deliberate,"
f" you should update the orchestrator's "
f"`kubernetes_context` field by running:\n\n"
f" `zenml orchestrator update {self.name} "
f"--kubernetes_context={active_context}`\n"
f"To list all configured contexts, run:\n\n"
f" `kubectl config get-contexts`\n"
f"To set the active context to be the same as the one "
f"configured in the Kubernetes orchestrator and "
f"silence this warning, run:\n\n"
f" `kubectl config use-context "
f"{self.config.kubernetes_context}`\n"
)
# Check that all stack components are non-local.
for stack_component in stack.components.values():
if stack_component.local_path is not None:
return False, (
f"The Kubernetes orchestrator currently only supports "
f"remote stacks, but the '{stack_component.name}' "
f"{stack_component.type.value} is a local component. "
f"Please make sure to only use non-local stack "
f"components with a Kubernetes orchestrator."
)
# if the orchestrator is remote, the container registry must
# also be remote.
if container_registry.config.is_local:
return False, (
f"The Kubernetes orchestrator requires a remote container "
f"registry, but the '{container_registry.name}' container "
f"registry of your active stack points to a local URI "
f"'{container_registry.config.uri}'. Please make sure "
f"stacks with a Kubernetes orchestrator always contain "
f"remote container registries."
)
return True, ""
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_validate_local_requirements,
)
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Runs the pipeline in Kubernetes.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Raises:
RuntimeError: If trying to run from a Jupyter notebook.
"""
# First check whether the code is running in a notebook.
if Environment.in_notebook():
raise RuntimeError(
"The Kubernetes orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
for step in deployment.steps.values():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not yet supported for "
"the Kubernetes orchestrator, ignoring resource "
"configuration for step %s.",
step.config.name,
)
pipeline_name = deployment.pipeline.name
orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
pod_name = kube_utils.sanitize_pod_name(orchestrator_run_name)
# Get Docker image name (for all pods).
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
# Build entrypoint command and args for the orchestrator pod.
# This will internally also build the command/args for all step pods.
command = (
KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
)
args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
image_name=image_name,
kubernetes_namespace=self.config.kubernetes_namespace,
)
settings = cast(
KubernetesOrchestratorSettings, self.get_settings(deployment)
)
# Authorize pod to run Kubernetes commands inside the cluster.
service_account_name = "zenml-service-account"
kube_utils.create_edit_service_account(
core_api=self._k8s_core_api,
rbac_api=self._k8s_rbac_api,
service_account_name=service_account_name,
namespace=self.config.kubernetes_namespace,
)
# Schedule as CRON job if CRON schedule is given.
if deployment.schedule:
if not deployment.schedule.cron_expression:
raise RuntimeError(
"The Kubernetes orchestrator only supports scheduling via "
"CRON jobs, but the run was configured with a manual "
"schedule. Use `Schedule(cron_expression=...)` instead."
)
cron_expression = deployment.schedule.cron_expression
cron_job_manifest = build_cron_job_manifest(
cron_expression=cron_expression,
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image_name,
command=command,
args=args,
service_account_name=service_account_name,
settings=settings,
)
self._k8s_batch_api.create_namespaced_cron_job(
body=cron_job_manifest,
namespace=self.config.kubernetes_namespace,
)
logger.info(
f"Scheduling Kubernetes run `{pod_name}` with CRON expression "
f'`"{cron_expression}"`.'
)
return
# Create and run the orchestrator pod.
pod_manifest = build_pod_manifest(
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image_name,
command=command,
args=args,
service_account_name=service_account_name,
settings=settings,
)
self._k8s_core_api.create_namespaced_pod(
namespace=self.config.kubernetes_namespace,
body=pod_manifest,
)
# Wait for the orchestrator pod to finish and stream logs.
if settings.synchronous:
logger.info("Waiting for Kubernetes orchestrator pod...")
kube_utils.wait_pod(
core_api=self._k8s_core_api,
pod_name=pod_name,
namespace=self.config.kubernetes_namespace,
exit_condition_lambda=kube_utils.pod_is_done,
timeout_sec=settings.timeout,
stream_logs=True,
)
else:
logger.info(
f"Orchestration started asynchronously in pod "
f"`{self.config.kubernetes_namespace}:{pod_name}`. "
f"Run the following command to inspect the logs: "
f"`kubectl logs {pod_name} -n {self.config.kubernetes_namespace}`."
)
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_KUBERNETES_RUN_ID}."
)
config: KubernetesOrchestratorConfig
property
readonly
Returns the KubernetesOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
KubernetesOrchestratorConfig |
The configuration. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Kubernetes orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Defines the validator that checks whether the stack is valid.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
Stack validator. |
__init__(self, *args, **kwargs)
special
Initialize the class and the Kubernetes clients.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
The positional arguments to pass to the Pydantic object. |
() |
**kwargs |
Any |
The keyword arguments to pass to the Pydantic object. |
{} |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the class and the Kubernetes clients.
Args:
*args: The positional arguments to pass to the Pydantic object.
**kwargs: The keyword arguments to pass to the Pydantic object.
"""
super().__init__(*args, **kwargs)
self._initialize_k8s_clients()
get_kubernetes_contexts(self)
Get list of configured Kubernetes contexts and the active context.
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the Kubernetes configuration cannot be loaded. |
Returns:
Type | Description |
---|---|
context_name |
List of configured Kubernetes contexts active_context_name: Name of the active Kubernetes context. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def get_kubernetes_contexts(self) -> Tuple[List[str], str]:
"""Get list of configured Kubernetes contexts and the active context.
Raises:
RuntimeError: if the Kubernetes configuration cannot be loaded.
Returns:
context_name: List of configured Kubernetes contexts
active_context_name: Name of the active Kubernetes context.
"""
try:
contexts, active_context = k8s_config.list_kube_config_contexts()
except k8s_config.config_exception.ConfigException as e:
raise RuntimeError(
"Could not load the Kubernetes configuration"
) from e
context_names = [c["name"] for c in contexts]
active_context_name = active_context["name"]
return context_names, active_context_name
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_KUBERNETES_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_KUBERNETES_RUN_ID}."
)
prepare_or_run_pipeline(self, deployment, stack)
Runs the pipeline in Kubernetes.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If trying to run from a Jupyter notebook. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Runs the pipeline in Kubernetes.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Raises:
RuntimeError: If trying to run from a Jupyter notebook.
"""
# First check whether the code is running in a notebook.
if Environment.in_notebook():
raise RuntimeError(
"The Kubernetes orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
for step in deployment.steps.values():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not yet supported for "
"the Kubernetes orchestrator, ignoring resource "
"configuration for step %s.",
step.config.name,
)
pipeline_name = deployment.pipeline.name
orchestrator_run_name = get_orchestrator_run_name(pipeline_name)
pod_name = kube_utils.sanitize_pod_name(orchestrator_run_name)
# Get Docker image name (for all pods).
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
# Build entrypoint command and args for the orchestrator pod.
# This will internally also build the command/args for all step pods.
command = (
KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_command()
)
args = KubernetesOrchestratorEntrypointConfiguration.get_entrypoint_arguments(
run_name=orchestrator_run_name,
image_name=image_name,
kubernetes_namespace=self.config.kubernetes_namespace,
)
settings = cast(
KubernetesOrchestratorSettings, self.get_settings(deployment)
)
# Authorize pod to run Kubernetes commands inside the cluster.
service_account_name = "zenml-service-account"
kube_utils.create_edit_service_account(
core_api=self._k8s_core_api,
rbac_api=self._k8s_rbac_api,
service_account_name=service_account_name,
namespace=self.config.kubernetes_namespace,
)
# Schedule as CRON job if CRON schedule is given.
if deployment.schedule:
if not deployment.schedule.cron_expression:
raise RuntimeError(
"The Kubernetes orchestrator only supports scheduling via "
"CRON jobs, but the run was configured with a manual "
"schedule. Use `Schedule(cron_expression=...)` instead."
)
cron_expression = deployment.schedule.cron_expression
cron_job_manifest = build_cron_job_manifest(
cron_expression=cron_expression,
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image_name,
command=command,
args=args,
service_account_name=service_account_name,
settings=settings,
)
self._k8s_batch_api.create_namespaced_cron_job(
body=cron_job_manifest,
namespace=self.config.kubernetes_namespace,
)
logger.info(
f"Scheduling Kubernetes run `{pod_name}` with CRON expression "
f'`"{cron_expression}"`.'
)
return
# Create and run the orchestrator pod.
pod_manifest = build_pod_manifest(
run_name=orchestrator_run_name,
pod_name=pod_name,
pipeline_name=pipeline_name,
image_name=image_name,
command=command,
args=args,
service_account_name=service_account_name,
settings=settings,
)
self._k8s_core_api.create_namespaced_pod(
namespace=self.config.kubernetes_namespace,
body=pod_manifest,
)
# Wait for the orchestrator pod to finish and stream logs.
if settings.synchronous:
logger.info("Waiting for Kubernetes orchestrator pod...")
kube_utils.wait_pod(
core_api=self._k8s_core_api,
pod_name=pod_name,
namespace=self.config.kubernetes_namespace,
exit_condition_lambda=kube_utils.pod_is_done,
timeout_sec=settings.timeout,
stream_logs=True,
)
else:
logger.info(
f"Orchestration started asynchronously in pod "
f"`{self.config.kubernetes_namespace}:{pod_name}`. "
f"Run the following command to inspect the logs: "
f"`kubectl logs {pod_name} -n {self.config.kubernetes_namespace}`."
)
prepare_pipeline_deployment(self, deployment, stack)
Build a Docker image and push it to the container registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
kubernetes_orchestrator_entrypoint
Entrypoint of the Kubernetes master/orchestrator pod.
main()
Entrypoint of the k8s master/orchestrator pod.
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
def main() -> None:
"""Entrypoint of the k8s master/orchestrator pod."""
# Log to the container's stdout so it can be streamed by the client.
logger.info("Kubernetes orchestrator pod started.")
# Parse / extract args.
args = parse_args()
# Get Kubernetes Core API for running kubectl commands later.
kube_utils.load_kube_config()
core_api = k8s_client.CoreV1Api()
orchestrator_run_id = socket.gethostname()
config_dict = yaml_utils.read_yaml(DOCKER_IMAGE_DEPLOYMENT_CONFIG_FILE)
deployment_config = PipelineDeployment.parse_obj(config_dict)
pipeline_dag = {}
step_name_to_pipeline_step_name = {}
for name_in_pipeline, step in deployment_config.steps.items():
step_name_to_pipeline_step_name[step.config.name] = name_in_pipeline
pipeline_dag[step.config.name] = step.spec.upstream_steps
step_command = StepEntrypointConfiguration.get_entrypoint_command()
def run_step_on_kubernetes(step_name: str) -> None:
"""Run a pipeline step in a separate Kubernetes pod.
Args:
step_name: Name of the step.
"""
# Define Kubernetes pod name.
pod_name = f"{orchestrator_run_id}-{step_name}"
pod_name = kube_utils.sanitize_pod_name(pod_name)
pipeline_step_name = step_name_to_pipeline_step_name[step_name]
step_args = StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=pipeline_step_name
)
step_config = deployment_config.steps[pipeline_step_name].config
settings = KubernetesOrchestratorSettings.parse_obj(
step_config.settings.get("orchestrator.kubernetes", {})
)
# Define Kubernetes pod manifest.
pod_manifest = build_pod_manifest(
pod_name=pod_name,
run_name=args.run_name,
pipeline_name=deployment_config.pipeline.name,
image_name=args.image_name,
command=step_command,
args=step_args,
env={ENV_ZENML_KUBERNETES_RUN_ID: orchestrator_run_id},
settings=settings,
)
# Create and run pod.
core_api.create_namespaced_pod(
namespace=args.kubernetes_namespace,
body=pod_manifest,
)
# Wait for pod to finish.
logger.info(f"Waiting for pod of step `{step_name}` to start...")
kube_utils.wait_pod(
core_api=core_api,
pod_name=pod_name,
namespace=args.kubernetes_namespace,
exit_condition_lambda=kube_utils.pod_is_done,
stream_logs=True,
)
logger.info(f"Pod of step `{step_name}` completed.")
ThreadedDagRunner(dag=pipeline_dag, run_fn=run_step_on_kubernetes).run()
logger.info("Orchestration pod completed.")
parse_args()
Parse entrypoint arguments.
Returns:
Type | Description |
---|---|
Namespace |
Parsed args. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint.py
def parse_args() -> argparse.Namespace:
"""Parse entrypoint arguments.
Returns:
Parsed args.
"""
parser = argparse.ArgumentParser()
parser.add_argument("--run_name", type=str, required=True)
parser.add_argument("--image_name", type=str, required=True)
parser.add_argument("--kubernetes_namespace", type=str, required=True)
return parser.parse_args()
kubernetes_orchestrator_entrypoint_configuration
Entrypoint configuration for the Kubernetes master/orchestrator pod.
KubernetesOrchestratorEntrypointConfiguration
Entrypoint configuration for the k8s master/orchestrator pod.
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
class KubernetesOrchestratorEntrypointConfiguration:
"""Entrypoint configuration for the k8s master/orchestrator pod."""
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all the options required for running this entrypoint.
Returns:
Entrypoint options.
"""
options = {
RUN_NAME_OPTION,
IMAGE_NAME_OPTION,
NAMESPACE_OPTION,
}
return options
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
Returns:
Entrypoint command.
"""
command = [
"python",
"-m",
"zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint",
]
return command
@classmethod
def get_entrypoint_arguments(
cls,
run_name: str,
image_name: str,
kubernetes_namespace: str,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
run_name: Name of the ZenML run.
image_name: Name of the Docker image.
kubernetes_namespace: Name of the Kubernetes namespace.
Returns:
List of entrypoint arguments.
"""
args = [
f"--{RUN_NAME_OPTION}",
run_name,
f"--{IMAGE_NAME_OPTION}",
image_name,
f"--{NAMESPACE_OPTION}",
kubernetes_namespace,
]
return args
get_entrypoint_arguments(run_name, image_name, kubernetes_namespace)
classmethod
Gets all arguments that the entrypoint command should be called with.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name |
str |
Name of the ZenML run. |
required |
image_name |
str |
Name of the Docker image. |
required |
kubernetes_namespace |
str |
Name of the Kubernetes namespace. |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of entrypoint arguments. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_arguments(
cls,
run_name: str,
image_name: str,
kubernetes_namespace: str,
) -> List[str]:
"""Gets all arguments that the entrypoint command should be called with.
Args:
run_name: Name of the ZenML run.
image_name: Name of the Docker image.
kubernetes_namespace: Name of the Kubernetes namespace.
Returns:
List of entrypoint arguments.
"""
args = [
f"--{RUN_NAME_OPTION}",
run_name,
f"--{IMAGE_NAME_OPTION}",
image_name,
f"--{NAMESPACE_OPTION}",
kubernetes_namespace,
]
return args
get_entrypoint_command()
classmethod
Returns a command that runs the entrypoint module.
Returns:
Type | Description |
---|---|
List[str] |
Entrypoint command. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_command(cls) -> List[str]:
"""Returns a command that runs the entrypoint module.
Returns:
Entrypoint command.
"""
command = [
"python",
"-m",
"zenml.integrations.kubernetes.orchestrators.kubernetes_orchestrator_entrypoint",
]
return command
get_entrypoint_options()
classmethod
Gets all the options required for running this entrypoint.
Returns:
Type | Description |
---|---|
Set[str] |
Entrypoint options. |
Source code in zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator_entrypoint_configuration.py
@classmethod
def get_entrypoint_options(cls) -> Set[str]:
"""Gets all the options required for running this entrypoint.
Returns:
Entrypoint options.
"""
options = {
RUN_NAME_OPTION,
IMAGE_NAME_OPTION,
NAMESPACE_OPTION,
}
return options
manifest_utils
Utility functions for building manifests for k8s pods.
add_pod_settings(settings)
Updates spec
fields in pod if passed in orchestrator settings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
settings |
KubernetesPodSettings |
Pod settings to apply. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Dictionary with additional fields for the pod |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def add_pod_settings(
settings: KubernetesPodSettings,
) -> Dict[str, Any]:
"""Updates `spec` fields in pod if passed in orchestrator settings.
Args:
settings: Pod settings to apply.
Returns:
Dictionary with additional fields for the pod
"""
spec: Dict[str, Any] = {}
if settings.node_selectors:
spec["nodeSelector"] = settings.node_selectors
if settings.affinity:
spec["affinity"] = settings.affinity
if settings.tolerations:
spec["tolerations"] = settings.tolerations
return spec
build_cluster_role_binding_manifest_for_service_account(name, role_name, service_account_name, namespace='default')
Build a manifest for a cluster role binding of a service account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the cluster role binding. |
required |
role_name |
str |
Name of the role. |
required |
service_account_name |
str |
Name of the service account. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
'default' |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest for a cluster role binding of a service account. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_cluster_role_binding_manifest_for_service_account(
name: str,
role_name: str,
service_account_name: str,
namespace: str = "default",
) -> Dict[str, Any]:
"""Build a manifest for a cluster role binding of a service account.
Args:
name: Name of the cluster role binding.
role_name: Name of the role.
service_account_name: Name of the service account.
namespace: Kubernetes namespace. Defaults to "default".
Returns:
Manifest for a cluster role binding of a service account.
"""
return {
"apiVersion": "rbac.authorization.k8s.io/v1",
"kind": "ClusterRoleBinding",
"metadata": {"name": name},
"subjects": [
{
"kind": "ServiceAccount",
"name": service_account_name,
"namespace": namespace,
}
],
"roleRef": {
"kind": "ClusterRole",
"name": role_name,
"apiGroup": "rbac.authorization.k8s.io",
},
}
build_cron_job_manifest(cron_expression, pod_name, run_name, pipeline_name, image_name, command, args, settings, service_account_name=None)
Create a manifest for launching a pod as scheduled CRON job.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cron_expression |
str |
CRON job schedule expression, e.g. " * * *". |
required |
pod_name |
str |
Name of the pod. |
required |
run_name |
str |
Name of the ZenML run. |
required |
pipeline_name |
str |
Name of the ZenML pipeline. |
required |
image_name |
str |
Name of the Docker image. |
required |
command |
List[str] |
Command to execute the entrypoint in the pod. |
required |
args |
List[str] |
Arguments provided to the entrypoint command. |
required |
settings |
KubernetesOrchestratorSettings |
|
required |
service_account_name |
Optional[str] |
Optional name of a service account. Can be used to assign certain roles to a pod, e.g., to allow it to run Kubernetes commands from within the cluster. |
None |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
CRON job manifest. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_cron_job_manifest(
cron_expression: str,
pod_name: str,
run_name: str,
pipeline_name: str,
image_name: str,
command: List[str],
args: List[str],
settings: KubernetesOrchestratorSettings,
service_account_name: Optional[str] = None,
) -> Dict[str, Any]:
"""Create a manifest for launching a pod as scheduled CRON job.
Args:
cron_expression: CRON job schedule expression, e.g. "* * * * *".
pod_name: Name of the pod.
run_name: Name of the ZenML run.
pipeline_name: Name of the ZenML pipeline.
image_name: Name of the Docker image.
command: Command to execute the entrypoint in the pod.
args: Arguments provided to the entrypoint command.
settings: `KubernetesOrchestratorSettings` object
service_account_name: Optional name of a service account.
Can be used to assign certain roles to a pod, e.g., to allow it to
run Kubernetes commands from within the cluster.
Returns:
CRON job manifest.
"""
pod_manifest = build_pod_manifest(
pod_name=pod_name,
run_name=run_name,
pipeline_name=pipeline_name,
image_name=image_name,
command=command,
args=args,
settings=settings,
service_account_name=service_account_name,
)
return {
"apiVersion": "batch/v1beta1",
"kind": "CronJob",
"metadata": pod_manifest["metadata"],
"spec": {
"schedule": cron_expression,
"jobTemplate": {
"metadata": pod_manifest["metadata"],
"spec": {"template": {"spec": pod_manifest["spec"]}},
},
},
}
build_mysql_deployment_manifest(name='mysql', namespace='default', port=3306, pv_claim_name='mysql-pv-claim')
Build a manifest for deploying a MySQL database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the deployment. Defaults to "mysql". |
'mysql' |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
'default' |
port |
int |
Port where MySQL is running. Defaults to 3306. |
3306 |
pv_claim_name |
str |
Name of the required persistent volume claim.
Defaults to |
'mysql-pv-claim' |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest for deploying a MySQL database. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_mysql_deployment_manifest(
name: str = "mysql",
namespace: str = "default",
port: int = 3306,
pv_claim_name: str = "mysql-pv-claim",
) -> Dict[str, Any]:
"""Build a manifest for deploying a MySQL database.
Args:
name: Name of the deployment. Defaults to "mysql".
namespace: Kubernetes namespace. Defaults to "default".
port: Port where MySQL is running. Defaults to 3306.
pv_claim_name: Name of the required persistent volume claim.
Defaults to `"mysql-pv-claim"`.
Returns:
Manifest for deploying a MySQL database.
"""
return {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": name, "namespace": namespace},
"spec": {
"selector": {
"matchLabels": {
"app": name,
},
},
"strategy": {
"type": "Recreate",
},
"template": {
"metadata": {
"labels": {"app": name},
},
"spec": {
"containers": [
{
"image": "gcr.io/ml-pipeline/mysql:5.6",
"name": name,
"env": [
{
"name": "MYSQL_ALLOW_EMPTY_PASSWORD",
"value": '"true"',
}
],
"ports": [{"containerPort": port, "name": name}],
"volumeMounts": [
{
"name": "mysql-persistent-storage",
"mountPath": "/var/lib/mysql",
}
],
}
],
"volumes": [
{
"name": "mysql-persistent-storage",
"persistentVolumeClaim": {
"claimName": pv_claim_name
},
}
],
},
},
},
}
build_mysql_service_manifest(name='mysql', namespace='default', port=3306)
Build a manifest for a service relating to a deployed MySQL database.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the service. Defaults to "mysql". |
'mysql' |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
'default' |
port |
int |
Port where MySQL is running. Defaults to 3306. |
3306 |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest for the MySQL service. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_mysql_service_manifest(
name: str = "mysql",
namespace: str = "default",
port: int = 3306,
) -> Dict[str, Any]:
"""Build a manifest for a service relating to a deployed MySQL database.
Args:
name: Name of the service. Defaults to "mysql".
namespace: Kubernetes namespace. Defaults to "default".
port: Port where MySQL is running. Defaults to 3306.
Returns:
Manifest for the MySQL service.
"""
return {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": name,
"namespace": namespace,
},
"spec": {
"selector": {"app": "mysql"},
"clusterIP": "None",
"ports": [{"port": port}],
},
}
build_namespace_manifest(namespace)
Build the manifest for a new namespace.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
namespace |
str |
Kubernetes namespace. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest of the new namespace. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_namespace_manifest(namespace: str) -> Dict[str, Any]:
"""Build the manifest for a new namespace.
Args:
namespace: Kubernetes namespace.
Returns:
Manifest of the new namespace.
"""
return {
"apiVersion": "v1",
"kind": "Namespace",
"metadata": {
"name": namespace,
},
}
build_persistent_volume_claim_manifest(name, namespace='default', storage_request='10Gi')
Build a manifest for a persistent volume claim.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the persistent volume claim. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
'default' |
storage_request |
str |
Size of the storage to request. Defaults to |
'10Gi' |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest for a persistent volume claim. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_persistent_volume_claim_manifest(
name: str,
namespace: str = "default",
storage_request: str = "10Gi",
) -> Dict[str, Any]:
"""Build a manifest for a persistent volume claim.
Args:
name: Name of the persistent volume claim.
namespace: Kubernetes namespace. Defaults to "default".
storage_request: Size of the storage to request. Defaults to `"10Gi"`.
Returns:
Manifest for a persistent volume claim.
"""
return {
"apiVersion": "v1",
"kind": "PersistentVolumeClaim",
"metadata": {
"name": name,
"namespace": namespace,
},
"spec": {
"storageClassName": "manual",
"accessModes": ["ReadWriteOnce"],
"resources": {
"requests": {
"storage": storage_request,
}
},
},
}
build_persistent_volume_manifest(name, namespace='default', storage_capacity='10Gi', path='/mnt/data')
Build a manifest for a persistent volume.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the persistent volume. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
'default' |
storage_capacity |
str |
Storage capacity of the volume. Defaults to |
'10Gi' |
path |
str |
Path where the volume is mounted. Defaults to |
'/mnt/data' |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest for a persistent volume. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_persistent_volume_manifest(
name: str,
namespace: str = "default",
storage_capacity: str = "10Gi",
path: str = "/mnt/data",
) -> Dict[str, Any]:
"""Build a manifest for a persistent volume.
Args:
name: Name of the persistent volume.
namespace: Kubernetes namespace. Defaults to "default".
storage_capacity: Storage capacity of the volume. Defaults to `"10Gi"`.
path: Path where the volume is mounted. Defaults to `"/mnt/data"`.
Returns:
Manifest for a persistent volume.
"""
return {
"apiVersion": "v1",
"kind": "PersistentVolume",
"metadata": {
"name": name,
"namespace": namespace,
"labels": {"type": "local"},
},
"spec": {
"storageClassName": "manual",
"capacity": {"storage": storage_capacity},
"accessModes": ["ReadWriteOnce"],
"hostPath": {"path": path},
},
}
build_pod_manifest(pod_name, run_name, pipeline_name, image_name, command, args, settings, service_account_name=None, env=None)
Build a Kubernetes pod manifest for a ZenML run or step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pod_name |
str |
Name of the pod. |
required |
run_name |
str |
Name of the ZenML run. |
required |
pipeline_name |
str |
Name of the ZenML pipeline. |
required |
image_name |
str |
Name of the Docker image. |
required |
command |
List[str] |
Command to execute the entrypoint in the pod. |
required |
args |
List[str] |
Arguments provided to the entrypoint command. |
required |
settings |
KubernetesOrchestratorSettings |
|
required |
service_account_name |
Optional[str] |
Optional name of a service account. Can be used to assign certain roles to a pod, e.g., to allow it to run Kubernetes commands from within the cluster. |
None |
env |
Optional[Dict[str, str]] |
Environment variables to set. |
None |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Pod manifest. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_pod_manifest(
pod_name: str,
run_name: str,
pipeline_name: str,
image_name: str,
command: List[str],
args: List[str],
settings: KubernetesOrchestratorSettings,
service_account_name: Optional[str] = None,
env: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
"""Build a Kubernetes pod manifest for a ZenML run or step.
Args:
pod_name: Name of the pod.
run_name: Name of the ZenML run.
pipeline_name: Name of the ZenML pipeline.
image_name: Name of the Docker image.
command: Command to execute the entrypoint in the pod.
args: Arguments provided to the entrypoint command.
settings: `KubernetesOrchestratorSettings` object
service_account_name: Optional name of a service account.
Can be used to assign certain roles to a pod, e.g., to allow it to
run Kubernetes commands from within the cluster.
env: Environment variables to set.
Returns:
Pod manifest.
"""
env = env.copy() if env else {}
env.setdefault(ENV_ZENML_ENABLE_REPO_INIT_WARNINGS, "False")
spec: Dict[str, Any] = {
"restartPolicy": "Never",
"containers": [
{
"name": "main",
"image": image_name,
"command": command,
"args": args,
"env": [
{"name": name, "value": value}
for name, value in env.items()
],
}
],
}
if service_account_name is not None:
spec["serviceAccountName"] = service_account_name
if settings.pod_settings:
spec.update(add_pod_settings(settings.pod_settings))
manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": pod_name,
"labels": {
"run": run_name,
"pipeline": pipeline_name,
},
},
"spec": spec,
}
return manifest
build_service_account_manifest(name, namespace='default')
Build the manifest for a service account.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
Name of the service account. |
required |
namespace |
str |
Kubernetes namespace. Defaults to "default". |
'default' |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
Manifest for a service account. |
Source code in zenml/integrations/kubernetes/orchestrators/manifest_utils.py
def build_service_account_manifest(
name: str, namespace: str = "default"
) -> Dict[str, Any]:
"""Build the manifest for a service account.
Args:
name: Name of the service account.
namespace: Kubernetes namespace. Defaults to "default".
Returns:
Manifest for a service account.
"""
return {
"apiVersion": "v1",
"metadata": {
"name": name,
"namespace": namespace,
},
}
pod_settings
Kubernetes pod settings.
KubernetesPodSettings (BaseSettings)
pydantic-model
Kubernetes Pod settings.
Attributes:
Name | Type | Description |
---|---|---|
node_selectors |
Dict[str, str] |
Node selectors to apply to the pod. |
affinity |
Dict[str, Any] |
Affinity to apply to the pod. |
tolerations |
List[Dict[str, Any]] |
Tolerations to apply to the pod. |
Source code in zenml/integrations/kubernetes/pod_settings.py
class KubernetesPodSettings(BaseSettings):
"""Kubernetes Pod settings.
Attributes:
node_selectors: Node selectors to apply to the pod.
affinity: Affinity to apply to the pod.
tolerations: Tolerations to apply to the pod.
"""
node_selectors: Dict[str, str] = {}
affinity: Dict[str, Any] = {}
tolerations: List[Dict[str, Any]] = []
@validator("affinity", pre=True)
def _convert_affinity(
cls, value: Union[Dict[str, Any], "V1Affinity"]
) -> Dict[str, Any]:
"""Converts Kubernetes affinity to a dict.
Args:
value: The affinity value.
Returns:
The converted value.
"""
from kubernetes.client.models import V1Affinity
if isinstance(value, V1Affinity):
return serialization_utils.serialize_kubernetes_model(value)
else:
return value
@validator("tolerations", pre=True)
def _convert_tolerations(
cls, value: List[Union[Dict[str, Any], "V1Toleration"]]
) -> List[Dict[str, Any]]:
"""Converts Kubernetes tolerations to dicts.
Args:
value: The tolerations list.
Returns:
The converted tolerations.
"""
from kubernetes.client.models import V1Toleration
result = []
for element in value:
if isinstance(element, V1Toleration):
result.append(
serialization_utils.serialize_kubernetes_model(element)
)
else:
result.append(element)
return result
serialization_utils
Kubernetes serialization utils.
deserialize_kubernetes_model(data, class_name)
Deserializes a Kubernetes model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Dict[str, Any] |
The model data. |
required |
class_name |
str |
Name of the Kubernetes model class. |
required |
Exceptions:
Type | Description |
---|---|
KeyError |
If the data contains values for an invalid attribute. |
Returns:
Type | Description |
---|---|
Any |
The deserialized model. |
Source code in zenml/integrations/kubernetes/serialization_utils.py
def deserialize_kubernetes_model(data: Dict[str, Any], class_name: str) -> Any:
"""Deserializes a Kubernetes model.
Args:
data: The model data.
class_name: Name of the Kubernetes model class.
Raises:
KeyError: If the data contains values for an invalid attribute.
Returns:
The deserialized model.
"""
model_class = get_model_class(class_name=class_name)
assert hasattr(model_class, "openapi_types")
assert hasattr(model_class, "attribute_map")
# Mapping of the attribute name of the model class to the attribute type
type_mapping = cast(Dict[str, str], model_class.openapi_types)
reverse_attribute_mapping = cast(Dict[str, str], model_class.attribute_map)
# Mapping of the serialized key to the attribute name of the model class
attribute_mapping = {
value: key for key, value in reverse_attribute_mapping.items()
}
deserialized_attributes: Dict[str, Any] = {}
for key, value in data.items():
if key not in attribute_mapping:
raise KeyError(
f"Got value for attribute {key} which is not one of the "
f"available attributes {set(attribute_mapping)}."
)
attribute_name = attribute_mapping[key]
attribute_class = type_mapping[attribute_name]
if not value:
deserialized_attributes[attribute_name] = value
elif attribute_class.startswith("list["):
match = re.fullmatch(r"list\[(.*)\]", attribute_class)
assert match
inner_class = match.group(1)
deserialized_attributes[attribute_name] = _deserialize_list(
value, class_name=inner_class
)
elif attribute_class.startswith("dict("):
match = re.fullmatch(r"dict\(([^,]*), (.*)\)", attribute_class)
assert match
inner_class = match.group(1)
deserialized_attributes[attribute_name] = _deserialize_dict(
value, class_name=inner_class
)
elif is_model_class(attribute_class):
deserialized_attributes[
attribute_name
] = deserialize_kubernetes_model(value, attribute_class)
else:
deserialized_attributes[attribute_name] = value
return model_class(**deserialized_attributes)
get_model_class(class_name)
Gets a Kubernetes model class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_name |
str |
Name of the class to get. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If no Kubernetes model class exists for this name. |
Returns:
Type | Description |
---|---|
Type[Any] |
The model class. |
Source code in zenml/integrations/kubernetes/serialization_utils.py
def get_model_class(class_name: str) -> Type[Any]:
"""Gets a Kubernetes model class.
Args:
class_name: Name of the class to get.
Raises:
TypeError: If no Kubernetes model class exists for this name.
Returns:
The model class.
"""
import kubernetes.client.models
class_ = getattr(kubernetes.client.models, class_name, None)
if not class_:
raise TypeError(
f"Unable to find kubernetes model class with name {class_name}."
)
assert isinstance(class_, type)
return class_
is_model_class(class_name)
Checks whether the given class name is a Kubernetes model class.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_name |
str |
Name of the class to check. |
required |
Returns:
Type | Description |
---|---|
bool |
If the given class name is a Kubernetes model class. |
Source code in zenml/integrations/kubernetes/serialization_utils.py
def is_model_class(class_name: str) -> bool:
"""Checks whether the given class name is a Kubernetes model class.
Args:
class_name: Name of the class to check.
Returns:
If the given class name is a Kubernetes model class.
"""
import kubernetes.client.models
return hasattr(kubernetes.client.models, class_name)
serialize_kubernetes_model(model)
Serializes a Kubernetes model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
Any |
The model to serialize. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If the model is not a Kubernetes model. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The serialized model. |
Source code in zenml/integrations/kubernetes/serialization_utils.py
def serialize_kubernetes_model(model: Any) -> Dict[str, Any]:
"""Serializes a Kubernetes model.
Args:
model: The model to serialize.
Raises:
TypeError: If the model is not a Kubernetes model.
Returns:
The serialized model.
"""
if not is_model_class(model.__class__.__name__):
raise TypeError(f"Unable to serialize non-kubernetes model {model}.")
assert hasattr(model, "attribute_map")
attribute_mapping = cast(Dict[str, str], model.attribute_map)
model_attributes = {
serialized_attribute_name: getattr(model, attribute_name)
for attribute_name, serialized_attribute_name in attribute_mapping.items()
}
return _serialize_dict(model_attributes)