Tekton
zenml.integrations.tekton
special
Initialization of the Tekton integration for ZenML.
The Tekton integration sub-module powers an alternative to the local orchestrator. You can enable it by registering the Tekton orchestrator with the CLI tool.
TektonIntegration (Integration)
Definition of Tekton Integration for ZenML.
Source code in zenml/integrations/tekton/__init__.py
class TektonIntegration(Integration):
"""Definition of Tekton Integration for ZenML."""
NAME = TEKTON
REQUIREMENTS = ["kfp-tekton==1.3.1"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Tekton integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.tekton.flavors import TektonOrchestratorFlavor
return [TektonOrchestratorFlavor]
flavors()
classmethod
Declare the stack component flavors for the Tekton integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/tekton/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Tekton integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.tekton.flavors import TektonOrchestratorFlavor
return [TektonOrchestratorFlavor]
flavors
special
Tekton integration flavors.
tekton_orchestrator_flavor
Tekton orchestrator flavor.
TektonOrchestratorConfig (BaseOrchestratorConfig, TektonOrchestratorSettings)
pydantic-model
Configuration for the Tekton orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
kubernetes_context |
str |
Name of a kubernetes context to run pipelines in. |
kubernetes_namespace |
str |
Name of the kubernetes namespace in which the pods that run the pipeline steps should be running. |
tekton_ui_port |
int |
A local port to which the Tekton UI will be forwarded. |
skip_ui_daemon_provisioning |
bool |
If |
Source code in zenml/integrations/tekton/flavors/tekton_orchestrator_flavor.py
class TektonOrchestratorConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseOrchestratorConfig, TektonOrchestratorSettings
):
"""Configuration for the Tekton orchestrator.
Attributes:
kubernetes_context: Name of a kubernetes context to run
pipelines in.
kubernetes_namespace: Name of the kubernetes namespace in which the
pods that run the pipeline steps should be running.
tekton_ui_port: A local port to which the Tekton UI will be forwarded.
skip_ui_daemon_provisioning: If `True`, provisioning the Tekton UI
daemon will be skipped.
"""
kubernetes_context: str # TODO: Potential setting
kubernetes_namespace: str = "zenml"
tekton_ui_port: int = DEFAULT_TEKTON_UI_PORT
skip_ui_daemon_provisioning: bool = False
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return True
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
TektonOrchestratorFlavor (BaseOrchestratorFlavor)
Flavor for the Tekton orchestrator.
Source code in zenml/integrations/tekton/flavors/tekton_orchestrator_flavor.py
class TektonOrchestratorFlavor(BaseOrchestratorFlavor):
"""Flavor for the Tekton orchestrator."""
@property
def name(self) -> str:
"""Name of the orchestrator flavor.
Returns:
Name of the orchestrator flavor.
"""
return TEKTON_ORCHESTRATOR_FLAVOR
@property
def config_class(self) -> Type[TektonOrchestratorConfig]:
"""Returns `TektonOrchestratorConfig` config class.
Returns:
The config class.
"""
return TektonOrchestratorConfig
@property
def implementation_class(self) -> Type["TektonOrchestrator"]:
"""Implementation class for this flavor.
Returns:
Implementation class for this flavor.
"""
from zenml.integrations.tekton.orchestrators import TektonOrchestrator
return TektonOrchestrator
config_class: Type[zenml.integrations.tekton.flavors.tekton_orchestrator_flavor.TektonOrchestratorConfig]
property
readonly
Returns TektonOrchestratorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.tekton.flavors.tekton_orchestrator_flavor.TektonOrchestratorConfig] |
The config class. |
implementation_class: Type[TektonOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[TektonOrchestrator] |
Implementation class for this flavor. |
name: str
property
readonly
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str |
Name of the orchestrator flavor. |
TektonOrchestratorSettings (BaseSettings)
pydantic-model
Settings for the Tekton orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
pod_settings |
Optional[zenml.integrations.kubernetes.pod_settings.KubernetesPodSettings] |
Pod settings to apply. |
Source code in zenml/integrations/tekton/flavors/tekton_orchestrator_flavor.py
class TektonOrchestratorSettings(BaseSettings):
"""Settings for the Tekton orchestrator.
Attributes:
pod_settings: Pod settings to apply.
"""
pod_settings: Optional[KubernetesPodSettings] = None
orchestrators
special
Initialization of the Tekton ZenML orchestrator.
tekton_orchestrator
Implementation of the Tekton orchestrator.
TektonOrchestrator (BaseOrchestrator)
Orchestrator responsible for running pipelines using Tekton.
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
class TektonOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines using Tekton."""
@property
def config(self) -> TektonOrchestratorConfig:
"""Returns the `TektonOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(TektonOrchestratorConfig, self._config)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Tekton orchestrator.
Returns:
The settings class.
"""
return TektonOrchestratorSettings
def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
"""Get the list of configured Kubernetes contexts and the active context.
Returns:
A tuple containing the list of configured Kubernetes contexts and
the active context.
"""
try:
contexts, active_context = k8s_config.list_kube_config_contexts()
except k8s_config.config_exception.ConfigException:
return [], None
context_names = [c["name"] for c in contexts]
active_context_name = active_context["name"]
return context_names, active_context_name
@property
def validator(self) -> Optional[StackValidator]:
"""Ensures a stack with only remote components and a container registry.
Returns:
A `StackValidator` instance.
"""
def _validate(stack: "Stack") -> Tuple[bool, str]:
container_registry = stack.container_registry
# should not happen, because the stack validation takes care of
# this, but just in case
assert container_registry is not None
contexts, _ = self.get_kubernetes_contexts()
if self.config.kubernetes_context not in contexts:
return False, (
f"Could not find a Kubernetes context named "
f"'{self.config.kubernetes_context}' in the local "
f"Kubernetes configuration. Please make sure that the "
f"Kubernetes cluster is running and that the kubeconfig "
f"file is configured correctly. To list all configured "
f"contexts, run:\n\n"
f" `kubectl config get-contexts`\n"
)
# go through all stack components and identify those that
# advertise a local path where they persist information that
# they need to be available when running pipelines.
for stack_component in stack.components.values():
local_path = stack_component.local_path
if local_path is None:
continue
return False, (
f"The Tekton orchestrator is configured to run "
f"pipelines in a remote Kubernetes cluster designated "
f"by the '{self.config.kubernetes_context}' configuration "
f"context, but the '{stack_component.name}' "
f"{stack_component.type.value} is a local stack component "
f"and will not be available in the Tekton pipeline "
f"step.\nPlease ensure that you always use non-local "
f"stack components with a Tekton orchestrator, "
f"otherwise you may run into pipeline execution "
f"problems. You should use a flavor of "
f"{stack_component.type.value} other than "
f"'{stack_component.flavor}'."
)
if container_registry.config.is_local:
return False, (
f"The Tekton orchestrator is configured to run "
f"pipelines in a remote Kubernetes cluster designated "
f"by the '{self.config.kubernetes_context}' configuration "
f"context, but the '{container_registry.name}' "
f"container registry URI '{container_registry.config.uri}' "
f"points to a local container registry. Please ensure "
f"that you always use non-local stack components with "
f"a Tekton orchestrator, otherwise you will "
f"run into problems. You should use a flavor of "
f"container registry other than "
f"'{container_registry.flavor}'."
)
return True, ""
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_validate,
)
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
@staticmethod
def _configure_container_resources(
container_op: dsl.ContainerOp,
resource_settings: "ResourceSettings",
) -> None:
"""Adds resource requirements to the container.
Args:
container_op: The container operation to configure.
resource_settings: The resource settings to use for this
container.
"""
if resource_settings.cpu_count is not None:
container_op = container_op.set_cpu_limit(
str(resource_settings.cpu_count)
)
if resource_settings.gpu_count is not None:
container_op = container_op.set_gpu_limit(
resource_settings.gpu_count
)
if resource_settings.memory is not None:
memory_limit = resource_settings.memory[:-1]
container_op = container_op.set_memory_limit(memory_limit)
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Runs the pipeline on Tekton.
This function first compiles the ZenML pipeline into a Tekton yaml
and then applies this configuration to run the pipeline.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Raises:
RuntimeError: If you try to run the pipelines in a notebook environment.
"""
# First check whether the code running in a notebook
if Environment.in_notebook():
raise RuntimeError(
"The Tekton orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
orchestrator_run_name = get_orchestrator_run_name(
pipeline_name=deployment.pipeline.name
)
def _construct_kfp_pipeline() -> None:
"""Create a container_op for each step.
This should contain the name of the docker image and configures the
entrypoint of the docker image to run the step.
Additionally, this gives each container_op information about its
direct downstream steps.
"""
# Dictionary of container_ops index by the associated step name
step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}
for step_name, step in deployment.steps.items():
command = StepEntrypointConfiguration.get_entrypoint_command()
arguments = (
StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
)
)
container_op = dsl.ContainerOp(
name=step.config.name,
image=image_name,
command=command,
arguments=arguments,
)
settings = cast(
TektonOrchestratorSettings, self.get_settings(step)
)
if settings.pod_settings:
apply_pod_settings(
container_op=container_op,
settings=settings.pod_settings,
)
container_op.container.add_env_variable(
k8s_client.V1EnvVar(
name=ENV_ZENML_TEKTON_RUN_ID,
value="$(context.pipelineRun.name)",
)
)
if self.requires_resources_in_orchestration_environment(step):
self._configure_container_resources(
container_op=container_op,
resource_settings=step.config.resource_settings,
)
# Find the upstream container ops of the current step and
# configure the current container op to run after them
for upstream_step_name in step.spec.upstream_steps:
upstream_container_op = step_name_to_container_op[
upstream_step_name
]
container_op.after(upstream_container_op)
# Update dictionary of container ops with the current one
step_name_to_container_op[step.config.name] = container_op
# Get a filepath to use to save the finished yaml to
fileio.makedirs(self.pipeline_directory)
pipeline_file_path = os.path.join(
self.pipeline_directory, f"{orchestrator_run_name}.yaml"
)
# Set the run name, which Tekton reads from this attribute of the
# pipeline function
setattr(
_construct_kfp_pipeline,
"_component_human_name",
orchestrator_run_name,
)
pipeline_config = TektonPipelineConf()
pipeline_config.add_pipeline_label(
"pipelines.kubeflow.org/cache_enabled", "false"
)
TektonCompiler().compile(
_construct_kfp_pipeline,
pipeline_file_path,
tekton_pipeline_conf=pipeline_config,
)
logger.info(
"Writing Tekton workflow definition to `%s`.", pipeline_file_path
)
if deployment.schedule:
logger.warning(
"The Tekton Orchestrator currently does not support the "
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
logger.info(
"Running Tekton pipeline in kubernetes context '%s' and namespace "
"'%s'.",
self.config.kubernetes_context,
self.config.kubernetes_namespace,
)
try:
subprocess.check_call(
[
"kubectl",
"--context",
self.config.kubernetes_context,
"--namespace",
self.config.kubernetes_namespace,
"apply",
"-f",
pipeline_file_path,
]
)
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Failed to upload Tekton pipeline: {str(e)}. "
f"Please make sure your kubernetes config is present and the "
f"{self.config.kubernetes_context} kubernetes context is "
f"configured correctly.",
)
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_TEKTON_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_TEKTON_RUN_ID}."
)
@property
def root_directory(self) -> str:
"""Returns path to the root directory for all files concerning this orchestrator.
Returns:
Path to the root directory.
"""
return os.path.join(
io_utils.get_global_config_directory(),
"tekton",
str(self.id),
)
@property
def pipeline_directory(self) -> str:
"""Path to a directory in which the Tekton pipeline files are stored.
Returns:
Path to the pipeline directory.
"""
return os.path.join(self.root_directory, "pipelines")
@property
def _pid_file_path(self) -> str:
"""Returns path to the daemon PID file.
Returns:
Path to the daemon PID file.
"""
return os.path.join(self.root_directory, "tekton_daemon.pid")
@property
def log_file(self) -> str:
"""Path of the daemon log file.
Returns:
Path of the daemon log file.
"""
return os.path.join(self.root_directory, "tekton_daemon.log")
@property
def is_provisioned(self) -> bool:
"""Returns if a local k3d cluster for this orchestrator exists.
Returns:
True if a local k3d cluster exists, False otherwise.
"""
return fileio.exists(self.root_directory)
@property
def is_running(self) -> bool:
"""Checks if the local UI daemon is running.
Returns:
True if the local UI daemon for this orchestrator is running.
"""
if self.config.skip_ui_daemon_provisioning:
return True
if sys.platform != "win32":
from zenml.utils.daemon import check_if_daemon_is_running
return check_if_daemon_is_running(self._pid_file_path)
else:
return True
def provision(self) -> None:
"""Provisions resources for the orchestrator."""
fileio.makedirs(self.root_directory)
def deprovision(self) -> None:
"""Deprovisions the orchestrator resources."""
if self.is_running:
self.suspend()
if fileio.exists(self.log_file):
fileio.remove(self.log_file)
def resume(self) -> None:
"""Starts the UI forwarding daemon if necessary."""
if self.is_running:
logger.info("Tekton UI forwarding is already running.")
return
self.start_ui_daemon()
def suspend(self) -> None:
"""Stops the UI forwarding daemon if it's running."""
if not self.is_running:
logger.info("Tekton UI forwarding not running.")
return
self.stop_ui_daemon()
def start_ui_daemon(self) -> None:
"""Starts the UI forwarding daemon if possible."""
port = self.config.tekton_ui_port
if (
port == DEFAULT_TEKTON_UI_PORT
and not networking_utils.port_available(port)
):
# if the user didn't specify a specific port and the default
# port is occupied, fallback to a random open port
port = networking_utils.find_available_port()
command = [
"kubectl",
"--context",
self.config.kubernetes_context,
"--namespace",
"tekton-pipelines",
"port-forward",
"svc/tekton-dashboard",
f"{port}:9097",
]
if not networking_utils.port_available(port):
modified_command = command.copy()
modified_command[-1] = "<PORT>:9097"
logger.warning(
"Unable to port-forward Tekton UI to local port %d "
"because the port is occupied. In order to access the Tekton "
"UI at http://localhost:<PORT>/, please run '%s' in a "
"separate command line shell (replace <PORT> with a free port "
"of your choice).",
port,
" ".join(modified_command),
)
elif sys.platform == "win32":
logger.warning(
"Daemon functionality not supported on Windows. "
"In order to access the Tekton UI at "
"http://localhost:%d/, please run '%s' in a separate command "
"line shell.",
port,
" ".join(command),
)
else:
from zenml.utils import daemon
def _daemon_function() -> None:
"""Port-forwards the Tekton UI pod."""
subprocess.check_call(command)
daemon.run_as_daemon(
_daemon_function,
pid_file=self._pid_file_path,
log_file=self.log_file,
)
logger.info(
"Started Tekton UI daemon (check the daemon logs at %s "
"in case you're not able to view the UI). The Tekton "
"UI should now be accessible at http://localhost:%d/.",
self.log_file,
port,
)
def stop_ui_daemon(self) -> None:
"""Stops the UI forwarding daemon if it's running."""
if fileio.exists(self._pid_file_path):
if sys.platform == "win32":
# Daemon functionality is not supported on Windows, so the PID
# file won't exist. This if clause exists just for mypy to not
# complain about missing functions
pass
else:
from zenml.utils import daemon
daemon.stop_daemon(self._pid_file_path)
fileio.remove(self._pid_file_path)
logger.info("Stopped Tektion UI daemon.")
config: TektonOrchestratorConfig
property
readonly
Returns the TektonOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
TektonOrchestratorConfig |
The configuration. |
is_provisioned: bool
property
readonly
Returns if a local k3d cluster for this orchestrator exists.
Returns:
Type | Description |
---|---|
bool |
True if a local k3d cluster exists, False otherwise. |
is_running: bool
property
readonly
Checks if the local UI daemon is running.
Returns:
Type | Description |
---|---|
bool |
True if the local UI daemon for this orchestrator is running. |
log_file: str
property
readonly
Path of the daemon log file.
Returns:
Type | Description |
---|---|
str |
Path of the daemon log file. |
pipeline_directory: str
property
readonly
Path to a directory in which the Tekton pipeline files are stored.
Returns:
Type | Description |
---|---|
str |
Path to the pipeline directory. |
root_directory: str
property
readonly
Returns path to the root directory for all files concerning this orchestrator.
Returns:
Type | Description |
---|---|
str |
Path to the root directory. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Tekton orchestrator.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Ensures a stack with only remote components and a container registry.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
A |
deprovision(self)
Deprovisions the orchestrator resources.
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def deprovision(self) -> None:
"""Deprovisions the orchestrator resources."""
if self.is_running:
self.suspend()
if fileio.exists(self.log_file):
fileio.remove(self.log_file)
get_kubernetes_contexts(self)
Get the list of configured Kubernetes contexts and the active context.
Returns:
Type | Description |
---|---|
Tuple[List[str], Optional[str]] |
A tuple containing the list of configured Kubernetes contexts and the active context. |
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def get_kubernetes_contexts(self) -> Tuple[List[str], Optional[str]]:
"""Get the list of configured Kubernetes contexts and the active context.
Returns:
A tuple containing the list of configured Kubernetes contexts and
the active context.
"""
try:
contexts, active_context = k8s_config.list_kube_config_contexts()
except k8s_config.config_exception.ConfigException:
return [], None
context_names = [c["name"] for c in contexts]
active_context_name = active_context["name"]
return context_names, active_context_name
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_TEKTON_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_TEKTON_RUN_ID}."
)
prepare_or_run_pipeline(self, deployment, stack)
Runs the pipeline on Tekton.
This function first compiles the ZenML pipeline into a Tekton yaml and then applies this configuration to run the pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If you try to run the pipelines in a notebook environment. |
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Runs the pipeline on Tekton.
This function first compiles the ZenML pipeline into a Tekton yaml
and then applies this configuration to run the pipeline.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Raises:
RuntimeError: If you try to run the pipelines in a notebook environment.
"""
# First check whether the code running in a notebook
if Environment.in_notebook():
raise RuntimeError(
"The Tekton orchestrator cannot run pipelines in a notebook "
"environment. The reason is that it is non-trivial to create "
"a Docker image of a notebook. Please consider refactoring "
"your notebook cells into separate scripts in a Python module "
"and run the code outside of a notebook when using this "
"orchestrator."
)
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
orchestrator_run_name = get_orchestrator_run_name(
pipeline_name=deployment.pipeline.name
)
def _construct_kfp_pipeline() -> None:
"""Create a container_op for each step.
This should contain the name of the docker image and configures the
entrypoint of the docker image to run the step.
Additionally, this gives each container_op information about its
direct downstream steps.
"""
# Dictionary of container_ops index by the associated step name
step_name_to_container_op: Dict[str, dsl.ContainerOp] = {}
for step_name, step in deployment.steps.items():
command = StepEntrypointConfiguration.get_entrypoint_command()
arguments = (
StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
)
)
container_op = dsl.ContainerOp(
name=step.config.name,
image=image_name,
command=command,
arguments=arguments,
)
settings = cast(
TektonOrchestratorSettings, self.get_settings(step)
)
if settings.pod_settings:
apply_pod_settings(
container_op=container_op,
settings=settings.pod_settings,
)
container_op.container.add_env_variable(
k8s_client.V1EnvVar(
name=ENV_ZENML_TEKTON_RUN_ID,
value="$(context.pipelineRun.name)",
)
)
if self.requires_resources_in_orchestration_environment(step):
self._configure_container_resources(
container_op=container_op,
resource_settings=step.config.resource_settings,
)
# Find the upstream container ops of the current step and
# configure the current container op to run after them
for upstream_step_name in step.spec.upstream_steps:
upstream_container_op = step_name_to_container_op[
upstream_step_name
]
container_op.after(upstream_container_op)
# Update dictionary of container ops with the current one
step_name_to_container_op[step.config.name] = container_op
# Get a filepath to use to save the finished yaml to
fileio.makedirs(self.pipeline_directory)
pipeline_file_path = os.path.join(
self.pipeline_directory, f"{orchestrator_run_name}.yaml"
)
# Set the run name, which Tekton reads from this attribute of the
# pipeline function
setattr(
_construct_kfp_pipeline,
"_component_human_name",
orchestrator_run_name,
)
pipeline_config = TektonPipelineConf()
pipeline_config.add_pipeline_label(
"pipelines.kubeflow.org/cache_enabled", "false"
)
TektonCompiler().compile(
_construct_kfp_pipeline,
pipeline_file_path,
tekton_pipeline_conf=pipeline_config,
)
logger.info(
"Writing Tekton workflow definition to `%s`.", pipeline_file_path
)
if deployment.schedule:
logger.warning(
"The Tekton Orchestrator currently does not support the "
"use of schedules. The `schedule` will be ignored "
"and the pipeline will be run immediately."
)
logger.info(
"Running Tekton pipeline in kubernetes context '%s' and namespace "
"'%s'.",
self.config.kubernetes_context,
self.config.kubernetes_namespace,
)
try:
subprocess.check_call(
[
"kubectl",
"--context",
self.config.kubernetes_context,
"--namespace",
self.config.kubernetes_namespace,
"apply",
"-f",
pipeline_file_path,
]
)
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"Failed to upload Tekton pipeline: {str(e)}. "
f"Please make sure your kubernetes config is present and the "
f"{self.config.kubernetes_context} kubernetes context is "
f"configured correctly.",
)
prepare_pipeline_deployment(self, deployment, stack)
Build a Docker image and push it to the container registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
provision(self)
Provisions resources for the orchestrator.
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def provision(self) -> None:
"""Provisions resources for the orchestrator."""
fileio.makedirs(self.root_directory)
resume(self)
Starts the UI forwarding daemon if necessary.
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def resume(self) -> None:
"""Starts the UI forwarding daemon if necessary."""
if self.is_running:
logger.info("Tekton UI forwarding is already running.")
return
self.start_ui_daemon()
start_ui_daemon(self)
Starts the UI forwarding daemon if possible.
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def start_ui_daemon(self) -> None:
"""Starts the UI forwarding daemon if possible."""
port = self.config.tekton_ui_port
if (
port == DEFAULT_TEKTON_UI_PORT
and not networking_utils.port_available(port)
):
# if the user didn't specify a specific port and the default
# port is occupied, fallback to a random open port
port = networking_utils.find_available_port()
command = [
"kubectl",
"--context",
self.config.kubernetes_context,
"--namespace",
"tekton-pipelines",
"port-forward",
"svc/tekton-dashboard",
f"{port}:9097",
]
if not networking_utils.port_available(port):
modified_command = command.copy()
modified_command[-1] = "<PORT>:9097"
logger.warning(
"Unable to port-forward Tekton UI to local port %d "
"because the port is occupied. In order to access the Tekton "
"UI at http://localhost:<PORT>/, please run '%s' in a "
"separate command line shell (replace <PORT> with a free port "
"of your choice).",
port,
" ".join(modified_command),
)
elif sys.platform == "win32":
logger.warning(
"Daemon functionality not supported on Windows. "
"In order to access the Tekton UI at "
"http://localhost:%d/, please run '%s' in a separate command "
"line shell.",
port,
" ".join(command),
)
else:
from zenml.utils import daemon
def _daemon_function() -> None:
"""Port-forwards the Tekton UI pod."""
subprocess.check_call(command)
daemon.run_as_daemon(
_daemon_function,
pid_file=self._pid_file_path,
log_file=self.log_file,
)
logger.info(
"Started Tekton UI daemon (check the daemon logs at %s "
"in case you're not able to view the UI). The Tekton "
"UI should now be accessible at http://localhost:%d/.",
self.log_file,
port,
)
stop_ui_daemon(self)
Stops the UI forwarding daemon if it's running.
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def stop_ui_daemon(self) -> None:
"""Stops the UI forwarding daemon if it's running."""
if fileio.exists(self._pid_file_path):
if sys.platform == "win32":
# Daemon functionality is not supported on Windows, so the PID
# file won't exist. This if clause exists just for mypy to not
# complain about missing functions
pass
else:
from zenml.utils import daemon
daemon.stop_daemon(self._pid_file_path)
fileio.remove(self._pid_file_path)
logger.info("Stopped Tektion UI daemon.")
suspend(self)
Stops the UI forwarding daemon if it's running.
Source code in zenml/integrations/tekton/orchestrators/tekton_orchestrator.py
def suspend(self) -> None:
"""Stops the UI forwarding daemon if it's running."""
if not self.is_running:
logger.info("Tekton UI forwarding not running.")
return
self.stop_ui_daemon()