Github
zenml.integrations.github
special
Initialization of the GitHub ZenML integration.
The GitHub integration provides a way to orchestrate pipelines using GitHub Actions.
GitHubIntegration (Integration)
Definition of GitHub integration for ZenML.
Source code in zenml/integrations/github/__init__.py
class GitHubIntegration(Integration):
"""Definition of GitHub integration for ZenML."""
NAME = GITHUB
REQUIREMENTS: List[str] = ["PyNaCl~=1.5.0"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the GitHub integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.github.flavors import (
GitHubActionsOrchestratorFlavor,
GitHubSecretsManagerFlavor,
)
return [GitHubActionsOrchestratorFlavor, GitHubSecretsManagerFlavor]
flavors()
classmethod
Declare the stack component flavors for the GitHub integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/github/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the GitHub integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.github.flavors import (
GitHubActionsOrchestratorFlavor,
GitHubSecretsManagerFlavor,
)
return [GitHubActionsOrchestratorFlavor, GitHubSecretsManagerFlavor]
flavors
special
GitHub integration flavors.
github_actions_orchestrator_flavor
GitHub Actions orchestrator flavor.
GitHubActionsOrchestratorConfig (BaseOrchestratorConfig)
pydantic-model
Configuration for the GitHub Actions orchestrator.
Attributes:
Name | Type | Description |
---|---|---|
skip_dirty_repository_check |
bool |
If |
skip_github_repository_check |
bool |
If |
push |
bool |
If |
Source code in zenml/integrations/github/flavors/github_actions_orchestrator_flavor.py
class GitHubActionsOrchestratorConfig(BaseOrchestratorConfig):
"""Configuration for the GitHub Actions orchestrator.
Attributes:
skip_dirty_repository_check: If `True`, this orchestrator will not
raise an exception when trying to run a pipeline while there are
still untracked/uncommitted files in the git repository.
skip_github_repository_check: If `True`, the orchestrator will not check
if your git repository is pointing to a GitHub remote.
push: If `True`, this orchestrator will automatically commit and push
the GitHub workflow file when running a pipeline. If `False`, the
workflow file will be written to the correct location but needs to
be committed and pushed manually.
"""
skip_dirty_repository_check: bool = False
skip_github_repository_check: bool = False
push: bool = False
@property
def is_remote(self) -> bool:
"""Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be
used with a local ZenML database or if it requires a remote ZenML
server.
Returns:
True if this config is for a remote component, False otherwise.
"""
return True
is_remote: bool
property
readonly
Checks if this stack component is running remotely.
This designation is used to determine if the stack component can be used with a local ZenML database or if it requires a remote ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a remote component, False otherwise. |
GitHubActionsOrchestratorFlavor (BaseOrchestratorFlavor)
GitHub Actions orchestrator flavor.
Source code in zenml/integrations/github/flavors/github_actions_orchestrator_flavor.py
class GitHubActionsOrchestratorFlavor(BaseOrchestratorFlavor):
"""GitHub Actions orchestrator flavor."""
@property
def name(self) -> str:
"""Name of the orchestrator flavor.
Returns:
Name of the orchestrator flavor.
"""
return GITHUB_ORCHESTRATOR_FLAVOR
@property
def config_class(self) -> Type[GitHubActionsOrchestratorConfig]:
"""Returns `GitHubActionsOrchestratorConfig` config class.
Returns:
The config class.
"""
return GitHubActionsOrchestratorConfig
@property
def implementation_class(self) -> Type["GitHubActionsOrchestrator"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.github.orchestrators import (
GitHubActionsOrchestrator,
)
return GitHubActionsOrchestrator
config_class: Type[zenml.integrations.github.flavors.github_actions_orchestrator_flavor.GitHubActionsOrchestratorConfig]
property
readonly
Returns GitHubActionsOrchestratorConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.github.flavors.github_actions_orchestrator_flavor.GitHubActionsOrchestratorConfig] |
The config class. |
implementation_class: Type[GitHubActionsOrchestrator]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[GitHubActionsOrchestrator] |
The implementation class. |
name: str
property
readonly
Name of the orchestrator flavor.
Returns:
Type | Description |
---|---|
str |
Name of the orchestrator flavor. |
github_secrets_manager_flavor
GitHub secrets manager flavor.
GitHubSecretsManagerConfig (BaseSecretsManagerConfig)
pydantic-model
The configuration for the GitHub Secrets Manager.
Attributes:
Name | Type | Description |
---|---|---|
owner |
str |
The owner (either individual or organization) of the repository. |
repository |
str |
Name of the GitHub repository. |
Source code in zenml/integrations/github/flavors/github_secrets_manager_flavor.py
class GitHubSecretsManagerConfig(BaseSecretsManagerConfig):
"""The configuration for the GitHub Secrets Manager.
Attributes:
owner: The owner (either individual or organization) of the repository.
repository: Name of the GitHub repository.
"""
owner: str
repository: str
GitHubSecretsManagerFlavor (BaseSecretsManagerFlavor)
Class for the GitHubSecretsManagerFlavor
.
Source code in zenml/integrations/github/flavors/github_secrets_manager_flavor.py
class GitHubSecretsManagerFlavor(BaseSecretsManagerFlavor):
"""Class for the `GitHubSecretsManagerFlavor`."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return GITHUB_SECRET_MANAGER_FLAVOR
@property
def config_class(self) -> Type[GitHubSecretsManagerConfig]:
"""Returns `GitHubSecretsManagerConfig` config class.
Returns:
The config class.
"""
return GitHubSecretsManagerConfig
@property
def implementation_class(self) -> Type["GitHubSecretsManager"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.github.secrets_managers import (
GitHubSecretsManager,
)
return GitHubSecretsManager
config_class: Type[zenml.integrations.github.flavors.github_secrets_manager_flavor.GitHubSecretsManagerConfig]
property
readonly
Returns GitHubSecretsManagerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.github.flavors.github_secrets_manager_flavor.GitHubSecretsManagerConfig] |
The config class. |
implementation_class: Type[GitHubSecretsManager]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[GitHubSecretsManager] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
orchestrators
special
Initialization of the GitHub Actions Orchestrator.
github_actions_orchestrator
Implementation of the GitHub Actions Orchestrator.
GitHubActionsOrchestrator (BaseOrchestrator)
Orchestrator responsible for running pipelines using GitHub Actions.
Source code in zenml/integrations/github/orchestrators/github_actions_orchestrator.py
class GitHubActionsOrchestrator(BaseOrchestrator):
"""Orchestrator responsible for running pipelines using GitHub Actions."""
_git_repo: Optional[Repo] = None
@property
def config(self) -> GitHubActionsOrchestratorConfig:
"""Returns the `GitHubActionsOrchestratorConfig` config.
Returns:
The configuration.
"""
return cast(GitHubActionsOrchestratorConfig, self._config)
@property
def git_repo(self) -> Repo:
"""Returns the git repository for the current working directory.
Returns:
Git repository for the current working directory.
Raises:
RuntimeError: If there is no git repository for the current working
directory or the repository remote is not pointing to GitHub.
"""
if not self._git_repo:
try:
self._git_repo = Repo(search_parent_directories=True)
except InvalidGitRepositoryError:
raise RuntimeError(
"Unable to find git repository in current working "
f"directory {os.getcwd()} or its parent directories."
)
remote_url = self.git_repo.remote().url
is_github_repo = any(
remote_url.startswith(prefix)
for prefix in GITHUB_REMOTE_URL_PREFIXES
)
if not (is_github_repo or self.config.skip_github_repository_check):
raise RuntimeError(
f"The remote URL '{remote_url}' of your git repo "
f"({self._git_repo.git_dir}) is not pointing to a GitHub "
"repository. The GitHub Actions orchestrator runs "
"pipelines using GitHub Actions and therefore only works "
"with GitHub repositories. If you want to skip this check "
"and run this orchestrator anyway, run: \n"
f"`zenml orchestrator update {self.name} "
"--skip_github_repository_check=true`"
)
return self._git_repo
@property
def workflow_directory(self) -> str:
"""Returns path to the GitHub workflows directory.
Returns:
The GitHub workflows directory.
"""
assert self.git_repo.working_dir
return os.path.join(self.git_repo.working_dir, ".github", "workflows")
@property
def validator(self) -> Optional[StackValidator]:
"""Validator that ensures that the stack is compatible.
Makes sure that the stack contains a container registry and only
remote components.
Returns:
The stack validator.
"""
def _validate_local_requirements(stack: "Stack") -> Tuple[bool, str]:
container_registry = stack.container_registry
assert container_registry is not None
if container_registry.config.is_local:
return False, (
"The GitHub Actions orchestrator requires a remote "
f"container registry, but the '{container_registry.name}' "
"container registry of your active stack points to a local "
f"URI '{container_registry.config.uri}'. Please make sure "
"stacks with a GitHub Actions orchestrator always contain "
"remote container registries."
)
if container_registry.requires_authentication:
return False, (
"The GitHub Actions orchestrator currently only works with "
"GitHub container registries or public container "
f"registries, but your {container_registry.flavor} "
f"container registry '{container_registry.name}' requires "
"authentication."
)
for component in stack.components.values():
if component.local_path is not None:
return False, (
"The GitHub Actions orchestrator runs pipelines on "
"remote GitHub Actions runners, but the "
f"'{component.name}' {component.type.value} of your "
"active stack is a local component. Please make sure "
"to only use remote stack components in combination "
"with the GitHub Actions orchestrator. "
)
return True, ""
return StackValidator(
required_components={StackComponentType.CONTAINER_REGISTRY},
custom_validation_function=_validate_local_requirements,
)
def _docker_login_step(
self,
container_registry: BaseContainerRegistry,
) -> Optional[Dict[str, Any]]:
"""GitHub Actions step for authenticating with the container registry.
Args:
container_registry: The container registry which (potentially)
requires a step to authenticate.
Returns:
Dictionary specifying the GitHub Actions step for authenticating
with the container registry if that is required, `None` otherwise.
"""
if (
isinstance(container_registry, GitHubContainerRegistryFlavor)
and container_registry.config.automatic_token_authentication
):
# Use GitHub Actions specific placeholder if the container registry
# specifies automatic token authentication
username = "${{ github.actor }}"
password = "${{ secrets.GITHUB_TOKEN }}"
# TODO: Uncomment these lines once we support different private
# container registries in GitHub Actions
# elif container_registry.requires_authentication:
# username = cast(str, container_registry.username)
# password = cast(str, container_registry.password)
else:
return None
return {
"name": "Authenticate with the container registry",
"uses": DOCKER_LOGIN_ACTION,
"with": {
"registry": container_registry.uri,
"username": username,
"password": password,
},
}
def _write_environment_file_step(
self,
file_name: str,
secrets_manager: Optional[BaseSecretsManager] = None,
) -> Dict[str, Any]:
"""GitHub Actions step for writing required environment variables.
Args:
file_name: Name of the environment file that should be written.
secrets_manager: Secrets manager that will be used to read secrets
during pipeline execution.
Returns:
Dictionary specifying the GitHub Actions step for writing the
environment file.
"""
# Always include the environment variable that specifies whether
# we're running in a GitHub Action workflow so the secret manager knows
# how to query secret values
command = (
f'echo {ENV_IN_GITHUB_ACTIONS}="${ENV_IN_GITHUB_ACTIONS}" '
f"> {file_name}; "
)
run_id_placeholder = (
"${{ github.run_id }}_${{ github.run_number }}_"
"${{ github.run_attempt }}"
)
command += (
f'echo {ENV_ZENML_GH_ACTIONS_RUN_ID}="{run_id_placeholder}" '
f">> {file_name}; "
)
if isinstance(secrets_manager, GitHubSecretsManager):
# Write all ZenML secrets into the environment file. Explicitly writing
# these `${{ secrets.<SECRET_NAME> }}` placeholders into the workflow
# yaml is the only way for us to access the GitHub secrets in a GitHub
# Actions workflow.
append_secret_placeholder = "echo {secret_name}=${{{{ secrets.{secret_name} }}}} >> {file}; "
for secret_name in secrets_manager.get_all_secret_keys(
include_prefix=True
):
command += append_secret_placeholder.format(
secret_name=secret_name, file=file_name
)
return {
"name": "Write environment file",
"run": command,
}
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_GH_ACTIONS_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_GH_ACTIONS_RUN_ID}."
)
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
Raises:
RuntimeError: If the orchestrator should only run in a clean git
repository and the repository is dirty.
"""
if (
not self.config.skip_dirty_repository_check
and self.git_repo.is_dirty(untracked_files=True)
):
raise RuntimeError(
"Trying to run a pipeline from within a dirty (=containing "
"untracked/uncommitted files) git repository."
"If you want this orchestrator to skip the dirty repo check in "
f"the future, run\n `zenml orchestrator update {self.name} "
"--skip_dirty_repository_check=true`"
)
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Writes a GitHub Action workflow yaml and optionally pushes it.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Raises:
ValueError: If a schedule without a cron expression or with an
invalid cron expression is passed.
"""
schedule = deployment.schedule
workflow_name = deployment.pipeline.name
if schedule:
# Add a suffix to the workflow filename so we don't overwrite
# scheduled pipeline by future schedules or single pipeline runs.
datetime_string = datetime.now().strftime("%y_%m_%d_%H_%M_%S")
workflow_name += f"-scheduled-{datetime_string}"
workflow_path = os.path.join(
self.workflow_directory,
f"{workflow_name}.yaml",
)
workflow_dict: Dict[str, Any] = {
"name": workflow_name,
}
if schedule:
if not schedule.cron_expression:
raise ValueError(
"GitHub Action workflows can only be scheduled using cron "
"expressions and not using a periodic schedule. If you "
"want to schedule pipelines using this GitHub Action "
"orchestrator, please include a cron expression in your "
"schedule object. For more information on GitHub workflow "
"schedules check out https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#schedule."
)
# GitHub workflows requires a schedule interval of at least 5
# minutes. Invalid cron expressions would be something like
# `*/3 * * * *` (all stars except the first part of the expression,
# which will have the format `*/minute_interval`)
if re.fullmatch(r"\*/[1-4]( \*){4,}", schedule.cron_expression):
raise ValueError(
"GitHub workflows requires a schedule interval of at "
"least 5 minutes which is incompatible with your cron "
f"expression '{schedule.cron_expression}'. An example of a "
"valid cron expression would be '* 1 * * *' to run "
"every hour. For more information on GitHub workflow "
"schedules check out https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#schedule."
)
logger.warning(
"GitHub only runs scheduled workflows once the "
"workflow file is merged to the default branch of the "
"repository (https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/about-branches#about-the-default-branch). "
"Please make sure to merge your current branch into the "
"default branch for this scheduled pipeline to run."
)
workflow_dict["on"] = {
"schedule": [{"cron": schedule.cron_expression}]
}
else:
# The pipeline should only run once. The only fool-proof way to
# only execute a workflow once seems to be running on specific tags.
# We don't want to create tags for each pipeline run though, so
# instead we only run this workflow if the workflow file is
# modified. As long as users don't manually modify these files this
# should be sufficient.
workflow_path_in_repo = os.path.relpath(
workflow_path, self.git_repo.working_dir
)
workflow_dict["on"] = {"push": {"paths": [workflow_path_in_repo]}}
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
# Prepare the step that writes an environment file which will get
# passed to the docker image
env_file_name = ".zenml_docker_env"
write_env_file_step = self._write_environment_file_step(
file_name=env_file_name, secrets_manager=stack.secrets_manager
)
docker_run_args = ["--env-file", env_file_name]
# Prepare the docker login step if necessary
container_registry = stack.container_registry
assert container_registry
docker_login_step = self._docker_login_step(container_registry)
# The base command that each job will execute with specific arguments
base_command = [
"docker",
"run",
*docker_run_args,
image_name,
] + StepEntrypointConfiguration.get_entrypoint_command()
jobs = {}
for step_name, step in deployment.steps.items():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not supported for the "
"GitHub Actions orchestrator, ignoring resource "
"configuration for step %s.",
step.config.name,
)
job_steps = []
# Copy the shared dicts here to avoid creating yaml anchors (which
# are currently not supported in GitHub workflow yaml files)
job_steps.append(copy.deepcopy(write_env_file_step))
if docker_login_step:
job_steps.append(copy.deepcopy(docker_login_step))
entrypoint_args = (
StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
)
)
command = base_command + entrypoint_args
docker_run_step = {
"name": "Run the docker image",
"run": " ".join(command),
}
job_steps.append(docker_run_step)
job_dict = {
"runs-on": "ubuntu-latest",
"needs": step.spec.upstream_steps,
"steps": job_steps,
}
jobs[step.config.name] = job_dict
workflow_dict["jobs"] = jobs
fileio.makedirs(self.workflow_directory)
yaml_utils.write_yaml(workflow_path, workflow_dict, sort_keys=False)
logger.info("Wrote GitHub workflow file to %s", workflow_path)
if self.config.push:
# Add, commit and push the pipeline workflow yaml
self.git_repo.index.add(workflow_path)
self.git_repo.index.commit(
"[ZenML GitHub Actions Orchestrator] Add github workflow for "
f"pipeline {deployment.pipeline.name}."
)
self.git_repo.remote().push()
logger.info("Pushed workflow file '%s'", workflow_path)
else:
logger.info(
"Automatically committing and pushing is disabled for this "
"orchestrator. To run the pipeline, you'll have to commit and "
"push the workflow file '%s' manually.\n"
"If you want to update this orchestrator to automatically "
"commit and push in the future, run "
"`zenml orchestrator update %s --push=true`",
workflow_path,
self.name,
)
config: GitHubActionsOrchestratorConfig
property
readonly
Returns the GitHubActionsOrchestratorConfig
config.
Returns:
Type | Description |
---|---|
GitHubActionsOrchestratorConfig |
The configuration. |
git_repo: Repo
property
readonly
Returns the git repository for the current working directory.
Returns:
Type | Description |
---|---|
Repo |
Git repository for the current working directory. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If there is no git repository for the current working directory or the repository remote is not pointing to GitHub. |
validator: Optional[zenml.stack.stack_validator.StackValidator]
property
readonly
Validator that ensures that the stack is compatible.
Makes sure that the stack contains a container registry and only remote components.
Returns:
Type | Description |
---|---|
Optional[zenml.stack.stack_validator.StackValidator] |
The stack validator. |
workflow_directory: str
property
readonly
Returns path to the GitHub workflows directory.
Returns:
Type | Description |
---|---|
str |
The GitHub workflows directory. |
get_orchestrator_run_id(self)
Returns the active orchestrator run id.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the environment variable specifying the run id is not set. |
Returns:
Type | Description |
---|---|
str |
The orchestrator run id. |
Source code in zenml/integrations/github/orchestrators/github_actions_orchestrator.py
def get_orchestrator_run_id(self) -> str:
"""Returns the active orchestrator run id.
Raises:
RuntimeError: If the environment variable specifying the run id
is not set.
Returns:
The orchestrator run id.
"""
try:
return os.environ[ENV_ZENML_GH_ACTIONS_RUN_ID]
except KeyError:
raise RuntimeError(
"Unable to read run id from environment variable "
f"{ENV_ZENML_GH_ACTIONS_RUN_ID}."
)
prepare_or_run_pipeline(self, deployment, stack)
Writes a GitHub Action workflow yaml and optionally pushes it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment to prepare or run. |
required |
stack |
Stack |
The stack the pipeline will run on. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If a schedule without a cron expression or with an invalid cron expression is passed. |
Source code in zenml/integrations/github/orchestrators/github_actions_orchestrator.py
def prepare_or_run_pipeline(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> Any:
"""Writes a GitHub Action workflow yaml and optionally pushes it.
Args:
deployment: The pipeline deployment to prepare or run.
stack: The stack the pipeline will run on.
Raises:
ValueError: If a schedule without a cron expression or with an
invalid cron expression is passed.
"""
schedule = deployment.schedule
workflow_name = deployment.pipeline.name
if schedule:
# Add a suffix to the workflow filename so we don't overwrite
# scheduled pipeline by future schedules or single pipeline runs.
datetime_string = datetime.now().strftime("%y_%m_%d_%H_%M_%S")
workflow_name += f"-scheduled-{datetime_string}"
workflow_path = os.path.join(
self.workflow_directory,
f"{workflow_name}.yaml",
)
workflow_dict: Dict[str, Any] = {
"name": workflow_name,
}
if schedule:
if not schedule.cron_expression:
raise ValueError(
"GitHub Action workflows can only be scheduled using cron "
"expressions and not using a periodic schedule. If you "
"want to schedule pipelines using this GitHub Action "
"orchestrator, please include a cron expression in your "
"schedule object. For more information on GitHub workflow "
"schedules check out https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#schedule."
)
# GitHub workflows requires a schedule interval of at least 5
# minutes. Invalid cron expressions would be something like
# `*/3 * * * *` (all stars except the first part of the expression,
# which will have the format `*/minute_interval`)
if re.fullmatch(r"\*/[1-4]( \*){4,}", schedule.cron_expression):
raise ValueError(
"GitHub workflows requires a schedule interval of at "
"least 5 minutes which is incompatible with your cron "
f"expression '{schedule.cron_expression}'. An example of a "
"valid cron expression would be '* 1 * * *' to run "
"every hour. For more information on GitHub workflow "
"schedules check out https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#schedule."
)
logger.warning(
"GitHub only runs scheduled workflows once the "
"workflow file is merged to the default branch of the "
"repository (https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/about-branches#about-the-default-branch). "
"Please make sure to merge your current branch into the "
"default branch for this scheduled pipeline to run."
)
workflow_dict["on"] = {
"schedule": [{"cron": schedule.cron_expression}]
}
else:
# The pipeline should only run once. The only fool-proof way to
# only execute a workflow once seems to be running on specific tags.
# We don't want to create tags for each pipeline run though, so
# instead we only run this workflow if the workflow file is
# modified. As long as users don't manually modify these files this
# should be sufficient.
workflow_path_in_repo = os.path.relpath(
workflow_path, self.git_repo.working_dir
)
workflow_dict["on"] = {"push": {"paths": [workflow_path_in_repo]}}
image_name = deployment.pipeline.extra[ORCHESTRATOR_DOCKER_IMAGE_KEY]
# Prepare the step that writes an environment file which will get
# passed to the docker image
env_file_name = ".zenml_docker_env"
write_env_file_step = self._write_environment_file_step(
file_name=env_file_name, secrets_manager=stack.secrets_manager
)
docker_run_args = ["--env-file", env_file_name]
# Prepare the docker login step if necessary
container_registry = stack.container_registry
assert container_registry
docker_login_step = self._docker_login_step(container_registry)
# The base command that each job will execute with specific arguments
base_command = [
"docker",
"run",
*docker_run_args,
image_name,
] + StepEntrypointConfiguration.get_entrypoint_command()
jobs = {}
for step_name, step in deployment.steps.items():
if self.requires_resources_in_orchestration_environment(step):
logger.warning(
"Specifying step resources is not supported for the "
"GitHub Actions orchestrator, ignoring resource "
"configuration for step %s.",
step.config.name,
)
job_steps = []
# Copy the shared dicts here to avoid creating yaml anchors (which
# are currently not supported in GitHub workflow yaml files)
job_steps.append(copy.deepcopy(write_env_file_step))
if docker_login_step:
job_steps.append(copy.deepcopy(docker_login_step))
entrypoint_args = (
StepEntrypointConfiguration.get_entrypoint_arguments(
step_name=step_name,
)
)
command = base_command + entrypoint_args
docker_run_step = {
"name": "Run the docker image",
"run": " ".join(command),
}
job_steps.append(docker_run_step)
job_dict = {
"runs-on": "ubuntu-latest",
"needs": step.spec.upstream_steps,
"steps": job_steps,
}
jobs[step.config.name] = job_dict
workflow_dict["jobs"] = jobs
fileio.makedirs(self.workflow_directory)
yaml_utils.write_yaml(workflow_path, workflow_dict, sort_keys=False)
logger.info("Wrote GitHub workflow file to %s", workflow_path)
if self.config.push:
# Add, commit and push the pipeline workflow yaml
self.git_repo.index.add(workflow_path)
self.git_repo.index.commit(
"[ZenML GitHub Actions Orchestrator] Add github workflow for "
f"pipeline {deployment.pipeline.name}."
)
self.git_repo.remote().push()
logger.info("Pushed workflow file '%s'", workflow_path)
else:
logger.info(
"Automatically committing and pushing is disabled for this "
"orchestrator. To run the pipeline, you'll have to commit and "
"push the workflow file '%s' manually.\n"
"If you want to update this orchestrator to automatically "
"commit and push in the future, run "
"`zenml orchestrator update %s --push=true`",
workflow_path,
self.name,
)
prepare_pipeline_deployment(self, deployment, stack)
Build a Docker image and push it to the container registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the orchestrator should only run in a clean git repository and the repository is dirty. |
Source code in zenml/integrations/github/orchestrators/github_actions_orchestrator.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
Raises:
RuntimeError: If the orchestrator should only run in a clean git
repository and the repository is dirty.
"""
if (
not self.config.skip_dirty_repository_check
and self.git_repo.is_dirty(untracked_files=True)
):
raise RuntimeError(
"Trying to run a pipeline from within a dirty (=containing "
"untracked/uncommitted files) git repository."
"If you want this orchestrator to skip the dirty repo check in "
f"the future, run\n `zenml orchestrator update {self.name} "
"--skip_dirty_repository_check=true`"
)
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(ORCHESTRATOR_DOCKER_IMAGE_KEY, repo_digest)
secrets_managers
special
Initialization of the GitHub Secrets Manager.
github_secrets_manager
Implementation of the GitHub Secrets Manager.
GitHubSecretsManager (BaseSecretsManager)
Class to interact with the GitHub secrets manager.
Source code in zenml/integrations/github/secrets_managers/github_secrets_manager.py
class GitHubSecretsManager(BaseSecretsManager):
"""Class to interact with the GitHub secrets manager."""
_session: Optional[requests.Session] = None
@property
def config(self) -> GitHubSecretsManagerConfig:
"""Returns the `GitHubSecretsManagerConfig` config.
Returns:
The configuration.
"""
return cast(GitHubSecretsManagerConfig, self._config)
@property
def post_registration_message(self) -> Optional[str]:
"""Info message regarding GitHub API authentication env variables.
Returns:
The info message.
"""
return AUTHENTICATION_CREDENTIALS_MESSAGE
@property
def session(self) -> requests.Session:
"""Session to send requests to the GitHub API.
Returns:
Session to use for GitHub API calls.
Raises:
RuntimeError: If authentication credentials for the GitHub API are
not set.
"""
if not self._session:
session = requests.Session()
github_username = os.getenv(ENV_GITHUB_USERNAME)
authentication_token = os.getenv(ENV_GITHUB_AUTHENTICATION_TOKEN)
if not github_username or not authentication_token:
raise RuntimeError(
"Missing authentication credentials for GitHub secrets "
"manager. " + AUTHENTICATION_CREDENTIALS_MESSAGE
)
session.auth = HTTPBasicAuth(github_username, authentication_token)
session.headers["Accept"] = "application/vnd.github.v3+json"
self._session = session
return self._session
def _send_request(
self, method: str, resource: Optional[str] = None, **kwargs: Any
) -> requests.Response:
"""Sends an HTTP request to the GitHub API.
Args:
method: Method of the HTTP request that should be sent.
resource: Optional resource to which the request should be sent. If
none is given, the default GitHub API secrets endpoint will be
used.
**kwargs: Will be passed to the `requests` library.
Returns:
HTTP response.
# noqa: DAR402
Raises:
HTTPError: If the request failed due to a client or server error.
"""
url = (
f"https://api.github.com/repos/{self.config.owner}"
f"/{self.config.repository}/actions/secrets"
)
if resource:
url += resource
response = self.session.request(method=method, url=url, **kwargs)
# Raise an exception in case of a client or server error
response.raise_for_status()
return response
def _encrypt_secret(self, secret_value: str) -> Tuple[str, str]:
"""Encrypts a secret value.
This method first fetches a public key from the GitHub API and then uses
this key to encrypt the secret value. This is needed in order to
register GitHub secrets using the API.
Args:
secret_value: Secret value to encrypt.
Returns:
The encrypted secret value and the key id of the GitHub public key.
"""
from nacl.encoding import Base64Encoder
from nacl.public import PublicKey, SealedBox
response_json = self._send_request("GET", resource="/public-key").json()
public_key = PublicKey(
response_json["key"].encode("utf-8"), Base64Encoder
)
sealed_box = SealedBox(public_key)
encrypted_bytes = sealed_box.encrypt(secret_value.encode("utf-8"))
encrypted_string = base64.b64encode(encrypted_bytes).decode("utf-8")
return encrypted_string, cast(str, response_json["key_id"])
def _has_secret(self, secret_name: str) -> bool:
"""Checks whether a secret exists for the given name.
Args:
secret_name: Name of the secret which should be checked.
Returns:
`True` if a secret with the given name exists, `False` otherwise.
"""
secret_name = _convert_secret_name(secret_name, remove_prefix=True)
return secret_name in self.get_all_secret_keys(include_prefix=False)
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets the value of a secret.
This method only works when called from within a GitHub Actions
environment.
Args:
secret_name: The name of the secret to get.
Returns:
The secret.
Raises:
KeyError: If a secret with this name doesn't exist.
RuntimeError: If not inside a GitHub Actions environments.
"""
full_secret_name = _convert_secret_name(secret_name, add_prefix=True)
# Raise a KeyError if the secret doesn't exist. We can do that even
# if we're not inside a GitHub Actions environment
if not self._has_secret(secret_name):
raise KeyError(
f"Unable to find secret '{secret_name}'. Please check the "
"GitHub UI to see if a **Repository** secret called "
f"'{full_secret_name}' exists. (ZenML uses the "
f"'{GITHUB_SECRET_PREFIX}' to differentiate ZenML "
"secrets from other GitHub secrets)"
)
if not inside_github_action_environment():
stack_name = Client().active_stack_model.name
commands = [
f"zenml stack copy {stack_name} <NEW_STACK_NAME>",
"zenml secrets_manager register <NEW_SECRETS_MANAGER_NAME> "
"--flavor=local",
"zenml stack update <NEW_STACK_NAME> "
"--secrets_manager=<NEW_SECRETS_MANAGER_NAME>",
"zenml stack set <NEW_STACK_NAME>",
f"zenml secrets-manager secret register {secret_name} ...",
]
raise RuntimeError(
"Getting GitHub secrets is only possible within a GitHub "
"Actions workflow. If you need this secret to access "
"stack components locally, you need to "
"register this secret in a different secrets manager. "
"You can do this by running the following commands: \n\n"
+ "\n".join(commands)
)
# If we're running inside an GitHub Actions environment using the a
# workflow generated by the GitHub Actions orchestrator, all ZenML
# secrets stored in the GitHub secrets manager will be accessible as
# environment variables
secret_value = cast(str, os.getenv(full_secret_name))
secret_dict = json.loads(string_utils.b64_decode(secret_value))
schema_class = SecretSchemaClassRegistry.get_class(
secret_schema=secret_dict[SECRET_SCHEMA_DICT_KEY]
)
secret_content = secret_dict[SECRET_CONTENT_DICT_KEY]
return schema_class(name=secret_name, **secret_content)
def get_all_secret_keys(self, include_prefix: bool = False) -> List[str]:
"""Get all secret keys.
If we're running inside a GitHub Actions environment, this will return
the names of all environment variables starting with a ZenML internal
prefix. Otherwise, this will return all GitHub **Repository** secrets
created by ZenML.
Args:
include_prefix: Whether or not the internal prefix that is used to
differentiate ZenML secrets from other GitHub secrets should be
included in the returned names.
Returns:
List of all secret keys.
"""
if inside_github_action_environment():
potential_secret_keys = list(os.environ)
else:
logger.info(
"Fetching list of secrets for repository %s/%s",
self.config.owner,
self.config.repository,
)
response = self._send_request("GET", params={"per_page": 100})
potential_secret_keys = [
secret_dict["name"]
for secret_dict in response.json()["secrets"]
]
keys = [
_convert_secret_name(key, remove_prefix=not include_prefix)
for key in potential_secret_keys
if key.startswith(GITHUB_SECRET_PREFIX)
]
return keys
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
Raises:
SecretExistsError: If a secret with this name already exists.
"""
if self._has_secret(secret.name):
raise SecretExistsError(
f"A secret with name '{secret.name}' already exists for this "
"GitHub repository. If you want to register a new value for "
f"this secret, please run `zenml secrets-manager secret delete {secret.name}` "
f"followed by `zenml secrets-manager secret register {secret.name} ...`."
)
secret_dict = {
SECRET_SCHEMA_DICT_KEY: secret.TYPE,
SECRET_CONTENT_DICT_KEY: secret.content,
}
secret_value = string_utils.b64_encode(json.dumps(secret_dict))
encrypted_secret, public_key_id = self._encrypt_secret(
secret_value=secret_value
)
body = {
"encrypted_value": encrypted_secret,
"key_id": public_key_id,
}
full_secret_name = _convert_secret_name(secret.name, add_prefix=True)
self._send_request("PUT", resource=f"/{full_secret_name}", json=body)
def update_secret(self, secret: BaseSecretSchema) -> NoReturn:
"""Update an existing secret.
Args:
secret: The secret to update.
Raises:
NotImplementedError: Always, as this functionality is not possible
using GitHub secrets which doesn't allow us to retrieve the
secret values outside of a GitHub Actions environment.
"""
raise NotImplementedError(
"Updating secrets is not possible with the GitHub secrets manager "
"as it is not possible to retrieve GitHub secrets values outside "
"of a GitHub Actions environment."
)
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
"""
full_secret_name = _convert_secret_name(secret_name, add_prefix=True)
self._send_request("DELETE", resource=f"/{full_secret_name}")
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
for secret_name in self.get_all_secret_keys(include_prefix=False):
self.delete_secret(secret_name=secret_name)
config: GitHubSecretsManagerConfig
property
readonly
Returns the GitHubSecretsManagerConfig
config.
Returns:
Type | Description |
---|---|
GitHubSecretsManagerConfig |
The configuration. |
post_registration_message: Optional[str]
property
readonly
Info message regarding GitHub API authentication env variables.
Returns:
Type | Description |
---|---|
Optional[str] |
The info message. |
session: Session
property
readonly
Session to send requests to the GitHub API.
Returns:
Type | Description |
---|---|
Session |
Session to use for GitHub API calls. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If authentication credentials for the GitHub API are not set. |
delete_all_secrets(self)
Delete all existing secrets.
Source code in zenml/integrations/github/secrets_managers/github_secrets_manager.py
def delete_all_secrets(self) -> None:
"""Delete all existing secrets."""
for secret_name in self.get_all_secret_keys(include_prefix=False):
self.delete_secret(secret_name=secret_name)
delete_secret(self, secret_name)
Delete an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to delete. |
required |
Source code in zenml/integrations/github/secrets_managers/github_secrets_manager.py
def delete_secret(self, secret_name: str) -> None:
"""Delete an existing secret.
Args:
secret_name: The name of the secret to delete.
"""
full_secret_name = _convert_secret_name(secret_name, add_prefix=True)
self._send_request("DELETE", resource=f"/{full_secret_name}")
get_all_secret_keys(self, include_prefix=False)
Get all secret keys.
If we're running inside a GitHub Actions environment, this will return the names of all environment variables starting with a ZenML internal prefix. Otherwise, this will return all GitHub Repository secrets created by ZenML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
include_prefix |
bool |
Whether or not the internal prefix that is used to differentiate ZenML secrets from other GitHub secrets should be included in the returned names. |
False |
Returns:
Type | Description |
---|---|
List[str] |
List of all secret keys. |
Source code in zenml/integrations/github/secrets_managers/github_secrets_manager.py
def get_all_secret_keys(self, include_prefix: bool = False) -> List[str]:
"""Get all secret keys.
If we're running inside a GitHub Actions environment, this will return
the names of all environment variables starting with a ZenML internal
prefix. Otherwise, this will return all GitHub **Repository** secrets
created by ZenML.
Args:
include_prefix: Whether or not the internal prefix that is used to
differentiate ZenML secrets from other GitHub secrets should be
included in the returned names.
Returns:
List of all secret keys.
"""
if inside_github_action_environment():
potential_secret_keys = list(os.environ)
else:
logger.info(
"Fetching list of secrets for repository %s/%s",
self.config.owner,
self.config.repository,
)
response = self._send_request("GET", params={"per_page": 100})
potential_secret_keys = [
secret_dict["name"]
for secret_dict in response.json()["secrets"]
]
keys = [
_convert_secret_name(key, remove_prefix=not include_prefix)
for key in potential_secret_keys
if key.startswith(GITHUB_SECRET_PREFIX)
]
return keys
get_secret(self, secret_name)
Gets the value of a secret.
This method only works when called from within a GitHub Actions environment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_name |
str |
The name of the secret to get. |
required |
Returns:
Type | Description |
---|---|
BaseSecretSchema |
The secret. |
Exceptions:
Type | Description |
---|---|
KeyError |
If a secret with this name doesn't exist. |
RuntimeError |
If not inside a GitHub Actions environments. |
Source code in zenml/integrations/github/secrets_managers/github_secrets_manager.py
def get_secret(self, secret_name: str) -> BaseSecretSchema:
"""Gets the value of a secret.
This method only works when called from within a GitHub Actions
environment.
Args:
secret_name: The name of the secret to get.
Returns:
The secret.
Raises:
KeyError: If a secret with this name doesn't exist.
RuntimeError: If not inside a GitHub Actions environments.
"""
full_secret_name = _convert_secret_name(secret_name, add_prefix=True)
# Raise a KeyError if the secret doesn't exist. We can do that even
# if we're not inside a GitHub Actions environment
if not self._has_secret(secret_name):
raise KeyError(
f"Unable to find secret '{secret_name}'. Please check the "
"GitHub UI to see if a **Repository** secret called "
f"'{full_secret_name}' exists. (ZenML uses the "
f"'{GITHUB_SECRET_PREFIX}' to differentiate ZenML "
"secrets from other GitHub secrets)"
)
if not inside_github_action_environment():
stack_name = Client().active_stack_model.name
commands = [
f"zenml stack copy {stack_name} <NEW_STACK_NAME>",
"zenml secrets_manager register <NEW_SECRETS_MANAGER_NAME> "
"--flavor=local",
"zenml stack update <NEW_STACK_NAME> "
"--secrets_manager=<NEW_SECRETS_MANAGER_NAME>",
"zenml stack set <NEW_STACK_NAME>",
f"zenml secrets-manager secret register {secret_name} ...",
]
raise RuntimeError(
"Getting GitHub secrets is only possible within a GitHub "
"Actions workflow. If you need this secret to access "
"stack components locally, you need to "
"register this secret in a different secrets manager. "
"You can do this by running the following commands: \n\n"
+ "\n".join(commands)
)
# If we're running inside an GitHub Actions environment using the a
# workflow generated by the GitHub Actions orchestrator, all ZenML
# secrets stored in the GitHub secrets manager will be accessible as
# environment variables
secret_value = cast(str, os.getenv(full_secret_name))
secret_dict = json.loads(string_utils.b64_decode(secret_value))
schema_class = SecretSchemaClassRegistry.get_class(
secret_schema=secret_dict[SECRET_SCHEMA_DICT_KEY]
)
secret_content = secret_dict[SECRET_CONTENT_DICT_KEY]
return schema_class(name=secret_name, **secret_content)
register_secret(self, secret)
Registers a new secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to register. |
required |
Exceptions:
Type | Description |
---|---|
SecretExistsError |
If a secret with this name already exists. |
Source code in zenml/integrations/github/secrets_managers/github_secrets_manager.py
def register_secret(self, secret: BaseSecretSchema) -> None:
"""Registers a new secret.
Args:
secret: The secret to register.
Raises:
SecretExistsError: If a secret with this name already exists.
"""
if self._has_secret(secret.name):
raise SecretExistsError(
f"A secret with name '{secret.name}' already exists for this "
"GitHub repository. If you want to register a new value for "
f"this secret, please run `zenml secrets-manager secret delete {secret.name}` "
f"followed by `zenml secrets-manager secret register {secret.name} ...`."
)
secret_dict = {
SECRET_SCHEMA_DICT_KEY: secret.TYPE,
SECRET_CONTENT_DICT_KEY: secret.content,
}
secret_value = string_utils.b64_encode(json.dumps(secret_dict))
encrypted_secret, public_key_id = self._encrypt_secret(
secret_value=secret_value
)
body = {
"encrypted_value": encrypted_secret,
"key_id": public_key_id,
}
full_secret_name = _convert_secret_name(secret.name, add_prefix=True)
self._send_request("PUT", resource=f"/{full_secret_name}", json=body)
update_secret(self, secret)
Update an existing secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
The secret to update. |
required |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
Always, as this functionality is not possible using GitHub secrets which doesn't allow us to retrieve the secret values outside of a GitHub Actions environment. |
Source code in zenml/integrations/github/secrets_managers/github_secrets_manager.py
def update_secret(self, secret: BaseSecretSchema) -> NoReturn:
"""Update an existing secret.
Args:
secret: The secret to update.
Raises:
NotImplementedError: Always, as this functionality is not possible
using GitHub secrets which doesn't allow us to retrieve the
secret values outside of a GitHub Actions environment.
"""
raise NotImplementedError(
"Updating secrets is not possible with the GitHub secrets manager "
"as it is not possible to retrieve GitHub secrets values outside "
"of a GitHub Actions environment."
)
inside_github_action_environment()
Returns if the current code is executing in a GitHub Actions environment.
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/integrations/github/secrets_managers/github_secrets_manager.py
def inside_github_action_environment() -> bool:
"""Returns if the current code is executing in a GitHub Actions environment.
Returns:
`True` if running in a GitHub Actions environment, `False` otherwise.
"""
return os.getenv(ENV_IN_GITHUB_ACTIONS) == "true"