Utils
zenml.utils
special
The utils
module contains utility functions handling analytics, reading and
writing YAML data as well as other general purpose functions.
analytics_utils
Analytics code for ZenML
AnalyticsEvent (str, Enum)
An enumeration.
Source code in zenml/utils/analytics_utils.py
class AnalyticsEvent(str, Enum):
# Pipelines
RUN_PIPELINE = "Pipeline run"
GET_PIPELINES = "Pipelines fetched"
GET_PIPELINE = "Pipeline fetched"
# Repo
INITIALIZE_REPO = "ZenML initialized"
# Profile
INITIALIZED_PROFILE = "Profile initialized"
# Components
REGISTERED_STACK_COMPONENT = "Stack component registered"
UPDATED_STACK_COMPONENT = "Stack component updated"
# Stack
REGISTERED_STACK = "Stack registered"
SET_STACK = "Stack set"
UPDATED_STACK = "Stack updated"
# Analytics opt in and out
OPT_IN_ANALYTICS = "Analytics opt-in"
OPT_OUT_ANALYTICS = "Analytics opt-out"
# Examples
RUN_EXAMPLE = "Example run"
PULL_EXAMPLE = "Example pull"
# Integrations
INSTALL_INTEGRATION = "Integration installed"
# Test event
EVENT_TEST = "Test event"
get_environment()
Returns a string representing the execution environment of the pipeline.
Currently, one of docker
, paperspace
, 'colab', or native
Source code in zenml/utils/analytics_utils.py
def get_environment() -> str:
"""Returns a string representing the execution environment of the pipeline.
Currently, one of `docker`, `paperspace`, 'colab', or `native`"""
if Environment.in_docker():
return "docker"
elif Environment.in_google_colab():
return "colab"
elif Environment.in_paperspace_gradient():
return "paperspace"
elif Environment.in_notebook():
return "notebook"
else:
return "native"
get_segment_key()
Get key for authorizing to Segment backend.
Returns:
Type | Description |
---|---|
str |
Segment key as a string. |
Source code in zenml/utils/analytics_utils.py
def get_segment_key() -> str:
"""Get key for authorizing to Segment backend.
Returns:
Segment key as a string.
"""
if IS_DEBUG_ENV:
return SEGMENT_KEY_DEV
else:
return SEGMENT_KEY_PROD
parametrized(dec)
This is a meta-decorator, that is, a decorator for decorators. As a decorator is a function, it actually works as a regular decorator with arguments:
Source code in zenml/utils/analytics_utils.py
def parametrized(
dec: Callable[..., Callable[..., Any]]
) -> Callable[..., Callable[[Callable[..., Any]], Callable[..., Any]]]:
"""This is a meta-decorator, that is, a decorator for decorators.
As a decorator is a function, it actually works as a regular decorator
with arguments:"""
def layer(
*args: Any, **kwargs: Any
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Internal layer"""
def repl(f: Callable[..., Any]) -> Callable[..., Any]:
"""Internal repl"""
return dec(f, *args, **kwargs)
return repl
return layer
track(*args, **kwargs)
Internal layer
Source code in zenml/utils/analytics_utils.py
def layer(
*args: Any, **kwargs: Any
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Internal layer"""
def repl(f: Callable[..., Any]) -> Callable[..., Any]:
"""Internal repl"""
return dec(f, *args, **kwargs)
return repl
track_event(event, metadata=None)
Track segment event if user opted-in.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Union[str, zenml.utils.analytics_utils.AnalyticsEvent] |
Name of event to track in segment. |
required |
metadata |
Optional[Dict[str, Any]] |
Dict of metadata to track. |
None |
Returns:
Type | Description |
---|---|
bool |
True if event is sent successfully, False is not. |
Source code in zenml/utils/analytics_utils.py
def track_event(
event: Union[str, AnalyticsEvent],
metadata: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Track segment event if user opted-in.
Args:
event: Name of event to track in segment.
metadata: Dict of metadata to track.
Returns:
True if event is sent successfully, False is not.
"""
try:
import analytics
from zenml.config.global_config import GlobalConfiguration
if analytics.write_key is None:
analytics.write_key = get_segment_key()
assert (
analytics.write_key is not None
), "Analytics key not set but trying to make telemetry call."
# Set this to 1 to avoid backoff loop
analytics.max_retries = 1
gc = GlobalConfiguration()
if isinstance(event, AnalyticsEvent):
event = event.value
logger.debug(
f"Attempting analytics: User: {gc.user_id}, "
f"Event: {event},"
f"Metadata: {metadata}"
)
if not gc.analytics_opt_in and event not in {
AnalyticsEvent.OPT_OUT_ANALYTICS,
AnalyticsEvent.OPT_IN_ANALYTICS,
}:
return False
if metadata is None:
metadata = {}
# add basics
metadata.update(Environment.get_system_info())
metadata.update(
{
"environment": get_environment(),
"python_version": Environment.python_version(),
"version": __version__,
}
)
analytics.track(str(gc.user_id), event, metadata)
logger.debug(
f"Analytics sent: User: {gc.user_id}, Event: {event}, Metadata: "
f"{metadata}"
)
return True
except Exception as e:
# We should never fail main thread
logger.debug(f"Analytics failed due to: {e}")
return False
daemon
Utility functions to start/stop daemon processes.
This is only implemented for UNIX systems and therefore doesn't work on Windows. Based on https://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/
check_if_daemon_is_running(pid_file)
Checks whether a daemon process indicated by the PID file is running.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pid_file |
str |
Path to file containing the PID of the daemon process to check. |
required |
Source code in zenml/utils/daemon.py
def check_if_daemon_is_running(pid_file: str) -> bool:
"""Checks whether a daemon process indicated by the PID file is running.
Args:
pid_file: Path to file containing the PID of the daemon
process to check.
"""
return get_daemon_pid_if_running(pid_file) is not None
daemonize(pid_file, log_file=None, working_directory='/')
Decorator that executes the decorated function as a daemon process.
Use this decorator to easily transform any function into a daemon process.
Examples:
import time
from zenml.utils.daemonizer import daemonize
@daemonize(log_file='/tmp/daemon.log', pid_file='/tmp/daemon.pid')
def sleeping_daemon(period: int) -> None:
print(f"I'm a daemon! I will sleep for {period} seconds.")
time.sleep(period)
print("Done sleeping, flying away.")
sleeping_daemon(period=30)
print("I'm the daemon's parent!.")
time.sleep(10) # just to prove that the daemon is running in parallel
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_func |
decorated function |
required | |
pid_file |
str |
an optional file where the PID of the daemon process will be stored. |
required |
log_file |
Optional[str] |
file where stdout and stderr are redirected for the daemon process. If not supplied, the daemon will be silenced (i.e. have its stdout/stderr redirected to /dev/null). |
None |
working_directory |
str |
working directory for the daemon process, defaults to the root directory. |
'/' |
Returns:
Type | Description |
---|---|
Callable[[~F], ~F] |
Decorated function that, when called, will detach from the current process and continue executing in the background, as a daemon process. |
Source code in zenml/utils/daemon.py
def daemonize(
pid_file: str,
log_file: Optional[str] = None,
working_directory: str = "/",
) -> Callable[[F], F]:
"""Decorator that executes the decorated function as a daemon process.
Use this decorator to easily transform any function into a daemon
process.
Example:
```python
import time
from zenml.utils.daemonizer import daemonize
@daemonize(log_file='/tmp/daemon.log', pid_file='/tmp/daemon.pid')
def sleeping_daemon(period: int) -> None:
print(f"I'm a daemon! I will sleep for {period} seconds.")
time.sleep(period)
print("Done sleeping, flying away.")
sleeping_daemon(period=30)
print("I'm the daemon's parent!.")
time.sleep(10) # just to prove that the daemon is running in parallel
```
Args:
_func: decorated function
pid_file: an optional file where the PID of the daemon process will
be stored.
log_file: file where stdout and stderr are redirected for the daemon
process. If not supplied, the daemon will be silenced (i.e. have
its stdout/stderr redirected to /dev/null).
working_directory: working directory for the daemon process,
defaults to the root directory.
Returns:
Decorated function that, when called, will detach from the current
process and continue executing in the background, as a daemon
process.
"""
def inner_decorator(_func: F) -> F:
def daemon(*args: Any, **kwargs: Any) -> None:
"""Standard daemonization of a process."""
# flake8: noqa: C901
if sys.platform == "win32":
logger.error(
"Daemon functionality is currently not supported on Windows."
)
else:
run_as_daemon(
_func,
log_file=log_file,
pid_file=pid_file,
working_directory=working_directory,
*args,
**kwargs,
)
return cast(F, daemon)
return inner_decorator
get_daemon_pid_if_running(pid_file)
Read and return the PID value from a PID file if the daemon process tracked by the PID file is running.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pid_file |
str |
Path to file containing the PID of the daemon process to check. |
required |
Returns:
Type | Description |
---|---|
Optional[int] |
The PID of the daemon process if it is running, otherwise None. |
Source code in zenml/utils/daemon.py
def get_daemon_pid_if_running(pid_file: str) -> Optional[int]:
"""Read and return the PID value from a PID file if the daemon process
tracked by the PID file is running.
Args:
pid_file: Path to file containing the PID of the daemon
process to check.
Returns:
The PID of the daemon process if it is running, otherwise None.
"""
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
except (IOError, FileNotFoundError):
return None
if not pid or not psutil.pid_exists(pid):
return None
return pid
run_as_daemon(daemon_function, *args, *, pid_file, log_file=None, working_directory='/', **kwargs)
Runs a function as a daemon process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
daemon_function |
~F |
The function to run as a daemon. |
required |
pid_file |
str |
Path to file in which to store the PID of the daemon process. |
required |
log_file |
Optional[str] |
Optional file to which the daemons stdout/stderr will be redirected to. |
None |
working_directory |
str |
Working directory for the daemon process, defaults to the root directory. |
'/' |
args |
Any |
Positional arguments to pass to the daemon function. |
() |
kwargs |
Any |
Keyword arguments to pass to the daemon function. |
{} |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If the PID file already exists. |
Source code in zenml/utils/daemon.py
def run_as_daemon(
daemon_function: F,
*args: Any,
pid_file: str,
log_file: Optional[str] = None,
working_directory: str = "/",
**kwargs: Any,
) -> None:
"""Runs a function as a daemon process.
Args:
daemon_function: The function to run as a daemon.
pid_file: Path to file in which to store the PID of the daemon
process.
log_file: Optional file to which the daemons stdout/stderr will be
redirected to.
working_directory: Working directory for the daemon process,
defaults to the root directory.
args: Positional arguments to pass to the daemon function.
kwargs: Keyword arguments to pass to the daemon function.
Raises:
FileExistsError: If the PID file already exists.
"""
# convert to absolute path as we will change working directory later
if pid_file:
pid_file = os.path.abspath(pid_file)
if log_file:
log_file = os.path.abspath(log_file)
# check if PID file exists
if pid_file and os.path.exists(pid_file):
raise FileExistsError(
f"The PID file '{pid_file}' already exists, either the daemon "
f"process is already running or something went wrong."
)
# first fork
try:
pid = os.fork()
if pid > 0:
# this is the process that called `run_as_daemon` so we
# simply return so it can keep running
return
except OSError as e:
logger.error("Unable to fork (error code: %d)", e.errno)
sys.exit(1)
# decouple from parent environment
os.chdir(working_directory)
os.setsid()
os.umask(0o22)
# second fork
try:
pid = os.fork()
if pid > 0:
# this is the parent of the future daemon process, kill it
# so the daemon gets adopted by the init process
sys.exit(0)
except OSError as e:
sys.stderr.write(f"Unable to fork (error code: {e.errno})")
sys.exit(1)
# redirect standard file descriptors to devnull (or the given logfile)
devnull = "/dev/null"
if hasattr(os, "devnull"):
devnull = os.devnull
devnull_fd = os.open(devnull, os.O_RDWR)
log_fd = (
os.open(log_file, os.O_CREAT | os.O_RDWR | os.O_APPEND)
if log_file
else None
)
out_fd = log_fd or devnull_fd
os.dup2(devnull_fd, sys.stdin.fileno())
os.dup2(out_fd, sys.stdout.fileno())
os.dup2(out_fd, sys.stderr.fileno())
if pid_file:
# write the PID file
with open(pid_file, "w+") as f:
f.write(f"{os.getpid()}\n")
# register actions in case this process exits/gets killed
def cleanup() -> None:
"""Daemon cleanup."""
terminate_children()
if pid_file and os.path.exists(pid_file):
os.remove(pid_file)
def sighndl(signum: int, frame: Optional[types.FrameType]) -> None:
"""Daemon signal handler."""
cleanup()
signal.signal(signal.SIGTERM, sighndl)
signal.signal(signal.SIGINT, sighndl)
atexit.register(cleanup)
# finally run the actual daemon code
daemon_function(*args, **kwargs)
sys.exit(0)
stop_daemon(pid_file)
Stops a daemon process.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pid_file |
str |
Path to file containing the PID of the daemon process to kill. |
required |
Source code in zenml/utils/daemon.py
def stop_daemon(pid_file: str) -> None:
"""Stops a daemon process.
Args:
pid_file: Path to file containing the PID of the daemon process to
kill.
"""
try:
with open(pid_file, "r") as f:
pid = int(f.read().strip())
except (IOError, FileNotFoundError):
logger.warning("Daemon PID file '%s' does not exist.", pid_file)
return
if psutil.pid_exists(pid):
process = psutil.Process(pid)
process.terminate()
else:
logger.warning("PID from '%s' does not exist.", pid_file)
terminate_children()
Terminate all processes that are children of the currently running process.
Source code in zenml/utils/daemon.py
def terminate_children() -> None:
"""Terminate all processes that are children of the currently running
process.
"""
pid = os.getpid()
try:
parent = psutil.Process(pid)
except psutil.Error:
# could not find parent process id
return
children = parent.children(recursive=False)
for p in children:
p.terminate()
_, alive = psutil.wait_procs(
children, timeout=CHILD_PROCESS_WAIT_TIMEOUT
)
for p in alive:
p.kill()
_, alive = psutil.wait_procs(
children, timeout=CHILD_PROCESS_WAIT_TIMEOUT
)
docker_utils
build_docker_image(build_context_path, image_name, entrypoint=None, dockerfile_path=None, dockerignore_path=None, requirements=None, environment_vars=None, use_local_requirements=False, base_image=None)
Builds a docker image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
build_context_path |
str |
Path to a directory that will be sent to the docker daemon as build context. |
required |
image_name |
str |
The name to use for the created docker image. |
required |
entrypoint |
Optional[str] |
Optional entrypoint command that gets executed when running a container of the built image. |
None |
dockerfile_path |
Optional[str] |
Optional path to a dockerfile. If no value is given, a temporary dockerfile will be created. |
None |
dockerignore_path |
Optional[str] |
Optional path to a dockerignore file. If no value is
given, the .dockerignore in the root of the build context will be
used if it exists. Otherwise, all files inside |
None |
requirements |
Optional[AbstractSet[str]] |
Optional list of pip requirements to install. This
will only be used if no value is given for |
None |
environment_vars |
Optional[Dict[str, str]] |
Optional dict of key value pairs that need to be embedded as environment variables in the image. |
None |
use_local_requirements |
bool |
If |
False |
base_image |
Optional[str] |
The image to use as base for the docker image. |
None |
Source code in zenml/utils/docker_utils.py
def build_docker_image(
build_context_path: str,
image_name: str,
entrypoint: Optional[str] = None,
dockerfile_path: Optional[str] = None,
dockerignore_path: Optional[str] = None,
requirements: Optional[AbstractSet[str]] = None,
environment_vars: Optional[Dict[str, str]] = None,
use_local_requirements: bool = False,
base_image: Optional[str] = None,
) -> None:
"""Builds a docker image.
Args:
build_context_path: Path to a directory that will be sent to the
docker daemon as build context.
image_name: The name to use for the created docker image.
entrypoint: Optional entrypoint command that gets executed when running
a container of the built image.
dockerfile_path: Optional path to a dockerfile. If no value is given,
a temporary dockerfile will be created.
dockerignore_path: Optional path to a dockerignore file. If no value is
given, the .dockerignore in the root of the build context will be
used if it exists. Otherwise, all files inside `build_context_path`
are included in the build context.
requirements: Optional list of pip requirements to install. This
will only be used if no value is given for `dockerfile_path`.
environment_vars: Optional dict of key value pairs that need to be
embedded as environment variables in the image.
use_local_requirements: If `True` and no values are given for
`dockerfile_path` and `requirements`, then the packages installed
in the environment of the current python processed will be
installed in the docker image.
base_image: The image to use as base for the docker image.
"""
config_path = os.path.join(build_context_path, CONTAINER_ZENML_CONFIG_DIR)
try:
# Save a copy of the current global configuration with the
# active profile and the active stack configuration into the build
# context, to have the active profile and active stack accessible from
# within the container.
GlobalConfiguration().copy_active_configuration(
config_path,
load_config_path=f"/app/{CONTAINER_ZENML_CONFIG_DIR}",
)
if not requirements and use_local_requirements:
local_requirements = get_current_environment_requirements()
requirements = {
f"{package}=={version}"
for package, version in local_requirements.items()
if package != "zenml" # exclude ZenML
}
logger.info(
"Using requirements from local environment to build "
"docker image: %s",
requirements,
)
if dockerfile_path:
dockerfile_contents = read_file_contents_as_string(dockerfile_path)
else:
dockerfile_contents = generate_dockerfile_contents(
base_image=base_image or DEFAULT_BASE_IMAGE,
entrypoint=entrypoint,
requirements=requirements,
environment_vars=environment_vars,
)
build_context = create_custom_build_context(
build_context_path=build_context_path,
dockerfile_contents=dockerfile_contents,
dockerignore_path=dockerignore_path,
)
# If a custom base image is provided, make sure to always pull the
# latest version of that image (if it isn't a locally built image).
# If no base image is provided, we use the static default ZenML image so
# there is no need to constantly pull
pull_base_image = False
if base_image:
pull_base_image = not is_local_image(base_image)
logger.info(
"Building docker image '%s', this might take a while...",
image_name,
)
docker_client = DockerClient.from_env()
# We use the client api directly here, so we can stream the logs
output_stream = docker_client.images.client.api.build(
fileobj=build_context,
custom_context=True,
tag=image_name,
pull=pull_base_image,
rm=False, # don't remove intermediate containers
)
_process_stream(output_stream)
finally:
# Clean up the temporary build files
fileio.rmtree(config_path)
logger.info("Finished building docker image.")
create_custom_build_context(build_context_path, dockerfile_contents, dockerignore_path=None)
Creates a docker build context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
build_context_path |
str |
Path to a directory that will be sent to the docker daemon as build context. |
required |
dockerfile_contents |
str |
File contents of the Dockerfile to use for the build. |
required |
dockerignore_path |
Optional[str] |
Optional path to a dockerignore file. If no value is
given, the .dockerignore in the root of the build context will be
used if it exists. Otherwise, all files inside |
None |
Returns:
Type | Description |
---|---|
Any |
Docker build context that can be passed when building a docker image. |
Source code in zenml/utils/docker_utils.py
def create_custom_build_context(
build_context_path: str,
dockerfile_contents: str,
dockerignore_path: Optional[str] = None,
) -> Any:
"""Creates a docker build context.
Args:
build_context_path: Path to a directory that will be sent to the
docker daemon as build context.
dockerfile_contents: File contents of the Dockerfile to use for the
build.
dockerignore_path: Optional path to a dockerignore file. If no value is
given, the .dockerignore in the root of the build context will be
used if it exists. Otherwise, all files inside `build_context_path`
are included in the build context.
Returns:
Docker build context that can be passed when building a docker image.
"""
exclude_patterns = []
default_dockerignore_path = os.path.join(
build_context_path, ".dockerignore"
)
if dockerignore_path:
exclude_patterns = _parse_dockerignore(dockerignore_path)
elif fileio.exists(default_dockerignore_path):
logger.info(
"Using dockerignore found at path '%s' to create docker "
"build context.",
default_dockerignore_path,
)
exclude_patterns = _parse_dockerignore(default_dockerignore_path)
else:
logger.info(
"No explicit dockerignore specified and no file called "
".dockerignore exists at the build context root (%s). "
"Creating docker build context with all files inside the build "
"context root directory.",
build_context_path,
)
logger.debug(
"Exclude patterns for creating docker build context: %s",
exclude_patterns,
)
no_ignores_found = not exclude_patterns
files = docker_build_utils.exclude_paths(
build_context_path, patterns=exclude_patterns
)
extra_files = [("Dockerfile", dockerfile_contents)]
context = docker_build_utils.create_archive(
root=build_context_path,
files=sorted(files),
gzip=False,
extra_files=extra_files,
)
build_context_size = os.path.getsize(context.name)
if build_context_size > 50 * 1024 * 1024 and no_ignores_found:
# The build context exceeds 50MiB and we didn't find any excludes
# in dockerignore files -> remind to specify a .dockerignore file
logger.warning(
"Build context size for docker image: %s. If you believe this is "
"unreasonably large, make sure to include a .dockerignore file at "
"the root of your build context (%s) or specify a custom file "
"when defining your pipeline.",
string_utils.get_human_readable_filesize(build_context_size),
default_dockerignore_path,
)
return context
generate_dockerfile_contents(base_image, entrypoint=None, requirements=None, environment_vars=None)
Generates a Dockerfile.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
base_image |
str |
The image to use as base for the dockerfile. |
required |
entrypoint |
Optional[str] |
The default entrypoint command that gets executed when running a container of an image created by this dockerfile. |
None |
requirements |
Optional[AbstractSet[str]] |
Optional list of pip requirements to install. |
None |
environment_vars |
Optional[Dict[str, str]] |
Optional dict of environment variables to set. |
None |
Returns:
Type | Description |
---|---|
str |
Content of a dockerfile. |
Source code in zenml/utils/docker_utils.py
def generate_dockerfile_contents(
base_image: str,
entrypoint: Optional[str] = None,
requirements: Optional[AbstractSet[str]] = None,
environment_vars: Optional[Dict[str, str]] = None,
) -> str:
"""Generates a Dockerfile.
Args:
base_image: The image to use as base for the dockerfile.
entrypoint: The default entrypoint command that gets executed when
running a container of an image created by this dockerfile.
requirements: Optional list of pip requirements to install.
environment_vars: Optional dict of environment variables to set.
Returns:
Content of a dockerfile.
"""
lines = [f"FROM {base_image}", "WORKDIR /app"]
# TODO [ENG-781]: Make secrets invisible in the Dockerfile or use a different approach.
if environment_vars:
for key, value in environment_vars.items():
lines.append(f"ENV {key.upper()}={value}")
if requirements:
lines.append(
f"RUN pip install --no-cache {' '.join(sorted(requirements))}"
)
lines.append("COPY . .")
lines.append("RUN chmod -R a+rw .")
lines.append(
f"ENV {ENV_ZENML_CONFIG_PATH}=/app/{CONTAINER_ZENML_CONFIG_DIR}"
)
if entrypoint:
lines.append(f"ENTRYPOINT {entrypoint}")
return "\n".join(lines)
get_current_environment_requirements()
Returns a dict of package requirements for the environment that the current python process is running in.
Source code in zenml/utils/docker_utils.py
def get_current_environment_requirements() -> Dict[str, str]:
"""Returns a dict of package requirements for the environment that
the current python process is running in."""
return {
distribution.key: distribution.version
for distribution in pkg_resources.working_set
}
get_image_digest(image_name)
Gets the digest of a docker image.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image_name |
str |
Name of the image to get the digest for. |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
Returns the repo digest for the given image if there exists exactly one.
If there are zero or multiple repo digests, returns |
Source code in zenml/utils/docker_utils.py
def get_image_digest(image_name: str) -> Optional[str]:
"""Gets the digest of a docker image.
Args:
image_name: Name of the image to get the digest for.
Returns:
Returns the repo digest for the given image if there exists exactly one.
If there are zero or multiple repo digests, returns `None`.
"""
docker_client = DockerClient.from_env()
image = docker_client.images.get(image_name)
repo_digests = image.attrs["RepoDigests"]
if len(repo_digests) == 1:
return cast(str, repo_digests[0])
else:
logger.debug(
"Found zero or more repo digests for docker image '%s': %s",
image_name,
repo_digests,
)
return None
is_local_image(image_name)
Returns whether an image was pulled from a registry or not.
Source code in zenml/utils/docker_utils.py
def is_local_image(image_name: str) -> bool:
"""Returns whether an image was pulled from a registry or not."""
docker_client = DockerClient.from_env()
images = docker_client.images.list(name=image_name)
if images:
# An image with this name is available locally -> now check whether it
# was pulled from a repo or built locally (in which case the repo
# digest is empty)
return get_image_digest(image_name) is None
else:
# no image with this name found locally
return False
push_docker_image(image_name)
Pushes a docker image to a container registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
image_name |
str |
The full name (including a tag) of the image to push. |
required |
Source code in zenml/utils/docker_utils.py
def push_docker_image(image_name: str) -> None:
"""Pushes a docker image to a container registry.
Args:
image_name: The full name (including a tag) of the image to push.
"""
logger.info("Pushing docker image '%s'.", image_name)
docker_client = DockerClient.from_env()
output_stream = docker_client.images.push(image_name, stream=True)
_process_stream(output_stream)
logger.info("Finished pushing docker image.")
enum_utils
StrEnum (str, Enum)
Base enum type for string enum values
Source code in zenml/utils/enum_utils.py
class StrEnum(str, Enum):
"""Base enum type for string enum values"""
def __str__(self) -> str:
"""Returns the enum string value."""
return self.value # type: ignore
@classmethod
def names(cls) -> List[str]:
"""Get all enum names as a list of strings"""
return [c.name for c in cls]
@classmethod
def values(cls) -> List[str]:
"""Get all enum values as a list of strings"""
return [c.value for c in cls]
networking_utils
find_available_port()
Finds a local random unoccupied TCP port.
Source code in zenml/utils/networking_utils.py
def find_available_port() -> int:
"""Finds a local random unoccupied TCP port."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
_, port = s.getsockname()
return cast(int, port)
port_available(port)
Checks if a local port is available.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
int |
TCP port number |
required |
Returns:
Type | Description |
---|---|
bool |
True if the port is available, otherwise False |
Source code in zenml/utils/networking_utils.py
def port_available(port: int) -> bool:
"""Checks if a local port is available.
Args:
port: TCP port number
Returns:
True if the port is available, otherwise False
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", port))
except socket.error as e:
logger.debug("Port %d unavailable: %s", port, e)
return False
return True
port_is_open(hostname, port)
Check if a TCP port is open on a remote host.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
hostname |
str |
hostname of the remote machine |
required |
port |
int |
TCP port number |
required |
Returns:
Type | Description |
---|---|
bool |
True if the port is open, False otherwise |
Source code in zenml/utils/networking_utils.py
def port_is_open(hostname: str, port: int) -> bool:
"""Check if a TCP port is open on a remote host.
Args:
hostname: hostname of the remote machine
port: TCP port number
Returns:
True if the port is open, False otherwise
"""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
result = sock.connect_ex((hostname, port))
return result == 0
except socket.error as e:
logger.debug(
f"Error checking TCP port {port} on host {hostname}: {str(e)}"
)
return False
scan_for_available_port(start=8000, stop=65535)
Scan the local network for an available port in the given range.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
start |
int |
the beginning of the port range value to scan |
8000 |
stop |
int |
the (inclusive) end of the port range value to scan |
65535 |
Returns:
Type | Description |
---|---|
Optional[int] |
The first available port in the given range, or None if no available port is found. |
Source code in zenml/utils/networking_utils.py
def scan_for_available_port(
start: int = SCAN_PORT_RANGE[0], stop: int = SCAN_PORT_RANGE[1]
) -> Optional[int]:
"""Scan the local network for an available port in the given range.
Args:
start: the beginning of the port range value to scan
stop: the (inclusive) end of the port range value to scan
Returns:
The first available port in the given range, or None if no available
port is found.
"""
for port in range(start, stop + 1):
if port_available(port):
return port
logger.debug(
"No free TCP ports found in the range %d - %d",
start,
stop,
)
return None
secrets_manager_utils
decode_secret_dict(secret_dict)
Base64 decode a Secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret_dict |
Dict[str, str] |
dict containing key-value pairs to decode |
required |
Returns:
Type | Description |
---|---|
Tuple[Dict[str, str], str] |
Decoded secret Dict containing key-value pairs |
Source code in zenml/utils/secrets_manager_utils.py
def decode_secret_dict(
secret_dict: Dict[str, str]
) -> Tuple[Dict[str, str], str]:
"""Base64 decode a Secret.
Args:
secret_dict: dict containing key-value pairs to decode
Returns:
Decoded secret Dict containing key-value pairs
"""
zenml_schema_name = secret_dict.pop(ZENML_SCHEMA_NAME)
decoded_secret = {k: decode_string(v) for k, v in secret_dict.items()}
return decoded_secret, zenml_schema_name
decode_string(string)
Base64 decode a string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
string |
str |
String to decode |
required |
Returns:
Type | Description |
---|---|
str |
Decoded string |
Source code in zenml/utils/secrets_manager_utils.py
def decode_string(string: str) -> str:
"""Base64 decode a string.
Args:
string: String to decode
Returns:
Decoded string
"""
decoded_bytes = base64.b64decode(string)
return str(decoded_bytes, "utf-8")
encode_secret(secret)
Base64 encode all values within a secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secret |
BaseSecretSchema |
Secret containing key-value pairs |
required |
Returns:
Type | Description |
---|---|
Dict[str, str] |
Encoded secret Dict containing key-value pairs |
Source code in zenml/utils/secrets_manager_utils.py
def encode_secret(secret: BaseSecretSchema) -> Dict[str, str]:
"""Base64 encode all values within a secret.
Args:
secret: Secret containing key-value pairs
Returns:
Encoded secret Dict containing key-value pairs
"""
encoded_secret = {
k: encode_string(str(v))
for k, v in secret.content.items()
if v is not None
}
encoded_secret[ZENML_SCHEMA_NAME] = secret.TYPE
return encoded_secret
encode_string(string)
Base64 encode a string.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
string |
str |
String to encode |
required |
Returns:
Type | Description |
---|---|
str |
Encoded string |
Source code in zenml/utils/secrets_manager_utils.py
def encode_string(string: str) -> str:
"""Base64 encode a string.
Args:
string: String to encode
Returns:
Encoded string
"""
encoded_bytes = base64.b64encode(string.encode("utf-8"))
return str(encoded_bytes, "utf-8")
singleton
SingletonMetaClass (type)
Singleton metaclass.
Use this metaclass to make any class into a singleton class:
class OneRing(metaclass=SingletonMetaClass):
def __init__(self, owner):
self._owner = owner
@property
def owner(self):
return self._owner
the_one_ring = OneRing('Sauron')
the_lost_ring = OneRing('Frodo')
print(the_lost_ring.owner) # Sauron
OneRing._clear() # ring destroyed
Source code in zenml/utils/singleton.py
class SingletonMetaClass(type):
"""Singleton metaclass.
Use this metaclass to make any class into a singleton class:
```python
class OneRing(metaclass=SingletonMetaClass):
def __init__(self, owner):
self._owner = owner
@property
def owner(self):
return self._owner
the_one_ring = OneRing('Sauron')
the_lost_ring = OneRing('Frodo')
print(the_lost_ring.owner) # Sauron
OneRing._clear() # ring destroyed
```
"""
def __init__(cls, *args: Any, **kwargs: Any) -> None:
"""Initialize a singleton class."""
super().__init__(*args, **kwargs)
cls.__singleton_instance: Optional["SingletonMetaClass"] = None
def __call__(cls, *args: Any, **kwargs: Any) -> "SingletonMetaClass":
"""Create or return the singleton instance."""
if not cls.__singleton_instance:
cls.__singleton_instance = cast(
"SingletonMetaClass", super().__call__(*args, **kwargs)
)
return cls.__singleton_instance
def _clear(cls) -> None:
"""Clear the singleton instance."""
cls.__singleton_instance = None
__call__(cls, *args, **kwargs)
special
Create or return the singleton instance.
Source code in zenml/utils/singleton.py
def __call__(cls, *args: Any, **kwargs: Any) -> "SingletonMetaClass":
"""Create or return the singleton instance."""
if not cls.__singleton_instance:
cls.__singleton_instance = cast(
"SingletonMetaClass", super().__call__(*args, **kwargs)
)
return cls.__singleton_instance
__init__(cls, *args, **kwargs)
special
Initialize a singleton class.
Source code in zenml/utils/singleton.py
def __init__(cls, *args: Any, **kwargs: Any) -> None:
"""Initialize a singleton class."""
super().__init__(*args, **kwargs)
cls.__singleton_instance: Optional["SingletonMetaClass"] = None
source_utils
These utils are predicated on the following definitions:
- class_source: This is a python-import type path to a class, e.g. some.mod.class
- module_source: This is a python-import type path to a module, e.g. some.mod
- file_path, relative_path, absolute_path: These are file system paths.
- source: This is a class_source or module_source. If it is a class_source, it can also be optionally pinned.
- pin: Whatever comes after the
@
symbol from a source, usually the git sha or the version of zenml as a string.
create_zenml_pin()
Creates a ZenML pin for source pinning from release version.
Source code in zenml/utils/source_utils.py
def create_zenml_pin() -> str:
"""Creates a ZenML pin for source pinning from release version."""
return f"{APP_NAME}_{__version__}"
get_absolute_path_from_module_source(module)
Get a directory path from module source.
E.g. zenml.core.step
will return full/path/to/zenml/core/step
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module |
str |
A module e.g. |
required |
Source code in zenml/utils/source_utils.py
def get_absolute_path_from_module_source(module: str) -> str:
"""Get a directory path from module source.
E.g. `zenml.core.step` will return `full/path/to/zenml/core/step`.
Args:
module: A module e.g. `zenml.core.step`.
"""
mod = importlib.import_module(module)
return mod.__path__[0]
get_class_source_from_source(source)
Gets class source from source, i.e. module.path@version, returns version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
source pointing to potentially pinned sha. |
required |
Source code in zenml/utils/source_utils.py
def get_class_source_from_source(source: str) -> str:
"""Gets class source from source, i.e. module.path@version, returns version.
Args:
source: source pointing to potentially pinned sha.
"""
# source need not even be pinned
return source.split("@")[0]
get_hashed_source(value)
Returns a hash of the objects source code.
Source code in zenml/utils/source_utils.py
def get_hashed_source(value: Any) -> str:
"""Returns a hash of the objects source code."""
try:
source_code = get_source(value)
except TypeError:
raise TypeError(
f"Unable to compute the hash of source code of object: {value}."
)
return hashlib.sha256(source_code.encode("utf-8")).hexdigest()
get_module_source_from_class(class_)
Takes class input and returns module_source. If class is already string then returns the same.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_ |
Union[Type[Any], str] |
object of type class. |
required |
Source code in zenml/utils/source_utils.py
def get_module_source_from_class(
class_: Union[Type[Any], str]
) -> Optional[str]:
"""Takes class input and returns module_source. If class is already string
then returns the same.
Args:
class_: object of type class.
"""
if isinstance(class_, str):
module_source = class_
else:
# Infer it from the class provided
if not inspect.isclass(class_):
raise AssertionError("step_type is neither string nor class.")
module_source = class_.__module__ + "." + class_.__name__
return module_source
get_module_source_from_module(module)
Gets the source of the supplied module.
E.g.:
-
a
/home/myrepo/src/run.py
module running as the main module returnsrun
if no repository root is specified. -
a
/home/myrepo/src/run.py
module running as the main module returnssrc.run
if the repository root is configured in/home/myrepo
-
a
/home/myrepo/src/pipeline.py
module not running as the main module returnssrc.pipeline
if the repository root is configured in/home/myrepo
-
a
/home/myrepo/src/pipeline.py
module not running as the main module returnspipeline
if no repository root is specified and the main module is also in/home/myrepo/src
. -
a
/home/step.py
module not running as the main module returnsstep
if the CWD is /home and the repository root or the main module are in a different path (e.g./home/myrepo/src
).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module |
module |
the module to get the source of. |
required |
Returns:
Type | Description |
---|---|
str |
The source of the main module. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the module is not loaded from a file |
Source code in zenml/utils/source_utils.py
def get_module_source_from_module(module: ModuleType) -> str:
"""Gets the source of the supplied module.
E.g.:
* a `/home/myrepo/src/run.py` module running as the main module returns
`run` if no repository root is specified.
* a `/home/myrepo/src/run.py` module running as the main module returns
`src.run` if the repository root is configured in `/home/myrepo`
* a `/home/myrepo/src/pipeline.py` module not running as the main module
returns `src.pipeline` if the repository root is configured in
`/home/myrepo`
* a `/home/myrepo/src/pipeline.py` module not running as the main module
returns `pipeline` if no repository root is specified and the main
module is also in `/home/myrepo/src`.
* a `/home/step.py` module not running as the main module
returns `step` if the CWD is /home and the repository root or the main
module are in a different path (e.g. `/home/myrepo/src`).
Args:
module: the module to get the source of.
Returns:
The source of the main module.
Raises:
RuntimeError: if the module is not loaded from a file
"""
if not hasattr(module, "__file__") or not module.__file__:
if module.__name__ == "__main__":
raise RuntimeError(
f"{module} module was not loaded from a file. Cannot "
"determine the module root path."
)
return module.__name__
module_path = os.path.abspath(module.__file__)
root_path = get_source_root_path()
if not module_path.startswith(root_path):
root_path = os.getcwd()
logger.warning(
"User module %s is not in the source root. Using current "
"directory %s instead to resolve module source.",
module,
root_path,
)
# Remove root_path from module_path to get relative path left over
module_path = module_path.replace(root_path, "")[1:]
# Kick out the .py and replace `/` with `.` to get the module source
module_path = module_path.replace(".py", "")
module_source = module_path.replace("/", ".")
logger.debug(
f"Resolved module source for module {module} to: {module_source}"
)
return module_source
get_module_source_from_source(source)
Gets module source from source. E.g. some.module.file.class@version
,
returns some.module
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
source pointing to potentially pinned sha. |
required |
Source code in zenml/utils/source_utils.py
def get_module_source_from_source(source: str) -> str:
"""Gets module source from source. E.g. `some.module.file.class@version`,
returns `some.module`.
Args:
source: source pointing to potentially pinned sha.
"""
class_source = get_class_source_from_source(source)
return ".".join(class_source.split(".")[:-2])
get_relative_path_from_module_source(module_source)
Get a directory path from module, relative to root of the package tree.
E.g. zenml.core.step will return zenml/core/step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
module_source |
str |
A module e.g. zenml.core.step |
required |
Source code in zenml/utils/source_utils.py
def get_relative_path_from_module_source(module_source: str) -> str:
"""Get a directory path from module, relative to root of the package tree.
E.g. zenml.core.step will return zenml/core/step.
Args:
module_source: A module e.g. zenml.core.step
"""
return module_source.replace(".", "/")
get_source(value)
Returns the source code of an object. If executing within a IPython
kernel environment, then this monkey-patches inspect
module temporarily
with a workaround to get source from the cell.
Exceptions:
Type | Description |
---|---|
TypeError |
If source not found. |
Source code in zenml/utils/source_utils.py
def get_source(value: Any) -> str:
"""Returns the source code of an object. If executing within a IPython
kernel environment, then this monkey-patches `inspect` module temporarily
with a workaround to get source from the cell.
Raises:
TypeError: If source not found.
"""
if Environment.in_notebook():
# Monkey patch inspect.getfile temporarily to make getsource work.
# Source: https://stackoverflow.com/questions/51566497/
def _new_getfile(
object: Any,
_old_getfile: Callable[
[
Union[
ModuleType,
Type[Any],
MethodType,
FunctionType,
TracebackType,
FrameType,
CodeType,
Callable[..., Any],
]
],
str,
] = inspect.getfile,
) -> Any:
if not inspect.isclass(object):
return _old_getfile(object)
# Lookup by parent module (as in current inspect)
if hasattr(object, "__module__"):
object_ = sys.modules.get(object.__module__)
if hasattr(object_, "__file__"):
return object_.__file__ # type: ignore[union-attr]
# If parent module is __main__, lookup by methods
for name, member in inspect.getmembers(object):
if (
inspect.isfunction(member)
and object.__qualname__ + "." + member.__name__
== member.__qualname__
):
return inspect.getfile(member)
else:
raise TypeError(f"Source for {object!r} not found.")
# Monkey patch, compute source, then revert monkey patch.
_old_getfile = inspect.getfile
inspect.getfile = _new_getfile
try:
src = inspect.getsource(value)
finally:
inspect.getfile = _old_getfile
else:
# Use standard inspect if running outside a notebook
src = inspect.getsource(value)
return src
get_source_root_path()
Get the repository root path or the source root path of the current process.
E.g.:
-
if the process was started by running a
run.py
file underfull/path/to/my/run.py
, and the repository root is configured atfull/path
, the source root path isfull/path
. -
same case as above, but when there is no repository root configured, the source root path is
full/path/to/my
.
Returns:
Type | Description |
---|---|
str |
The source root path of the current process. |
Source code in zenml/utils/source_utils.py
def get_source_root_path() -> str:
"""Get the repository root path or the source root path of the current
process.
E.g.:
* if the process was started by running a `run.py` file under
`full/path/to/my/run.py`, and the repository root is configured at
`full/path`, the source root path is `full/path`.
* same case as above, but when there is no repository root configured,
the source root path is `full/path/to/my`.
Returns:
The source root path of the current process.
"""
from zenml.repository import Repository
repo_root = Repository.find_repository()
if repo_root:
logger.debug("Using repository root as source root: %s", repo_root)
return str(repo_root.resolve())
main_module = sys.modules.get("__main__")
if main_module is None:
raise RuntimeError(
"Could not determine the main module used to run the current "
"process."
)
if not hasattr(main_module, "__file__") or not main_module.__file__:
raise RuntimeError(
"Main module was not started from a file. Cannot "
"determine the module root path."
)
path = pathlib.Path(main_module.__file__).resolve().parent
logger.debug("Using main module location as source root: %s", path)
return str(path)
import_class_by_path(class_path)
Imports a class based on a given path
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_path |
str |
str, class_source e.g. this.module.Class |
required |
Returns: the given class
Source code in zenml/utils/source_utils.py
def import_class_by_path(class_path: str) -> Type[Any]:
"""Imports a class based on a given path
Args:
class_path: str, class_source e.g. this.module.Class
Returns: the given class
"""
classname = class_path.split(".")[-1]
modulename = ".".join(class_path.split(".")[0:-1])
mod = importlib.import_module(modulename)
return getattr(mod, classname) # type: ignore[no-any-return]
import_python_file(file_path)
Imports a python file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to python file that should be imported. |
required |
Returns:
Type | Description |
---|---|
module |
The imported module. |
Source code in zenml/utils/source_utils.py
def import_python_file(file_path: str) -> types.ModuleType:
"""Imports a python file.
Args:
file_path: Path to python file that should be imported.
Returns:
The imported module.
"""
# Add directory of python file to PYTHONPATH so we can import it
file_path = os.path.abspath(file_path)
sys.path.append(os.path.dirname(file_path))
module_name = os.path.splitext(os.path.basename(file_path))[0]
return importlib.import_module(module_name)
is_inside_repository(file_path)
Returns whether a file is inside a zenml repository.
Source code in zenml/utils/source_utils.py
def is_inside_repository(file_path: str) -> bool:
"""Returns whether a file is inside a zenml repository."""
from zenml.repository import Repository
repo_path = Repository.find_repository()
if not repo_path:
return False
repo_path = repo_path.resolve()
absolute_file_path = pathlib.Path(file_path).resolve()
return repo_path in absolute_file_path.parents
is_standard_pin(pin)
Returns True
if pin is valid ZenML pin, else False.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pin |
str |
potential ZenML pin like 'zenml_0.1.1' |
required |
Source code in zenml/utils/source_utils.py
def is_standard_pin(pin: str) -> bool:
"""Returns `True` if pin is valid ZenML pin, else False.
Args:
pin: potential ZenML pin like 'zenml_0.1.1'
"""
if pin.startswith(f"{APP_NAME}_"):
return True
return False
is_standard_source(source)
Returns True
if source is a standard ZenML source.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
class_source e.g. this.module.Class[@pin]. |
required |
Source code in zenml/utils/source_utils.py
def is_standard_source(source: str) -> bool:
"""Returns `True` if source is a standard ZenML source.
Args:
source: class_source e.g. this.module.Class[@pin].
"""
if source.split(".")[0] == "zenml":
return True
return False
is_third_party_module(file_path)
Returns whether a file belongs to a third party package.
Source code in zenml/utils/source_utils.py
def is_third_party_module(file_path: str) -> bool:
"""Returns whether a file belongs to a third party package."""
absolute_file_path = pathlib.Path(file_path).resolve()
for path in site.getsitepackages() + [site.getusersitepackages()]:
if pathlib.Path(path).resolve() in absolute_file_path.parents:
return True
return False
load_source_path_class(source, import_path=None)
Loads a Python class from the source.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
class_source e.g. this.module.Class[@sha] |
required |
import_path |
Optional[str] |
optional path to add to python path |
None |
Source code in zenml/utils/source_utils.py
def load_source_path_class(
source: str, import_path: Optional[str] = None
) -> Type[Any]:
"""Loads a Python class from the source.
Args:
source: class_source e.g. this.module.Class[@sha]
import_path: optional path to add to python path
"""
from zenml.repository import Repository
repo_root = Repository.find_repository()
if not import_path and repo_root:
import_path = str(repo_root)
if "@" in source:
source = source.split("@")[0]
if import_path is not None:
with prepend_python_path(import_path):
logger.debug(
f"Loading class {source} with import path {import_path}"
)
return import_class_by_path(source)
return import_class_by_path(source)
prepend_python_path(path)
Simple context manager to help import module within the repo
Source code in zenml/utils/source_utils.py
@contextmanager
def prepend_python_path(path: str) -> Iterator[None]:
"""Simple context manager to help import module within the repo"""
try:
# Entering the with statement
sys.path.insert(0, path)
yield
finally:
# Exiting the with statement
sys.path.remove(path)
resolve_class(class_)
Resolves a class into a serializable source string.
For classes that are not built-in nor imported from a Python package, the
get_source_root_path
function is used to determine the root path
relative to which the class source is resolved.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
class_ |
Type[Any] |
A Python Class reference. |
required |
Returns: source_path e.g. this.module.Class.
Source code in zenml/utils/source_utils.py
def resolve_class(class_: Type[Any]) -> str:
"""Resolves a class into a serializable source string.
For classes that are not built-in nor imported from a Python package, the
`get_source_root_path` function is used to determine the root path
relative to which the class source is resolved.
Args:
class_: A Python Class reference.
Returns: source_path e.g. this.module.Class.
"""
initial_source = class_.__module__ + "." + class_.__name__
if is_standard_source(initial_source):
return resolve_standard_source(initial_source)
try:
file_path = inspect.getfile(class_)
except TypeError:
# builtin file
return initial_source
if initial_source.startswith("__main__") or is_third_party_module(
file_path
):
return initial_source
# Regular user file -> get the full module path relative to the
# source root.
module_source = get_module_source_from_module(
sys.modules[class_.__module__]
)
# ENG-123 Sanitize for Windows OS
# module_source = module_source.replace("\\", ".")
logger.debug(f"Resolved class {class_} to {module_source}")
return module_source + "." + class_.__name__
resolve_standard_source(source)
Creates a ZenML pin for source pinning from release version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
source |
str |
class_source e.g. this.module.Class. |
required |
Source code in zenml/utils/source_utils.py
def resolve_standard_source(source: str) -> str:
"""Creates a ZenML pin for source pinning from release version.
Args:
source: class_source e.g. this.module.Class.
"""
if "@" in source:
raise AssertionError(f"source {source} is already pinned.")
pin = create_zenml_pin()
return f"{source}@{pin}"
string_utils
get_human_readable_filesize(bytes_)
Convert a file size in bytes into a human-readable string.
Source code in zenml/utils/string_utils.py
def get_human_readable_filesize(bytes_: int) -> str:
"""Convert a file size in bytes into a human-readable string."""
size = abs(float(bytes_))
for unit in ["B", "KiB", "MiB", "GiB"]:
if size < 1024.0 or unit == "GiB":
break
size /= 1024.0
return f"{size:.2f} {unit}"
get_human_readable_time(seconds)
Convert seconds into a human-readable string.
Source code in zenml/utils/string_utils.py
def get_human_readable_time(seconds: float) -> str:
"""Convert seconds into a human-readable string."""
prefix = "-" if seconds < 0 else ""
seconds = abs(seconds)
int_seconds = int(seconds)
days, int_seconds = divmod(int_seconds, 86400)
hours, int_seconds = divmod(int_seconds, 3600)
minutes, int_seconds = divmod(int_seconds, 60)
if days > 0:
time_string = f"{days}d{hours}h{minutes}m{int_seconds}s"
elif hours > 0:
time_string = f"{hours}h{minutes}m{int_seconds}s"
elif minutes > 0:
time_string = f"{minutes}m{int_seconds}s"
else:
time_string = f"{seconds:.3f}s"
return prefix + time_string
typed_model
BaseTypedModel (BaseModel)
pydantic-model
Typed Pydantic model base class.
Use this class as a base class instead of BaseModel to automatically
add a type
literal attribute to the model that stores the name of the
class.
This can be useful when serializing models to JSON and then de-serializing them as part of a submodel union field, e.g.:
class BluePill(BaseTypedModel):
...
class RedPill(BaseTypedModel):
...
class TheMatrix(BaseTypedModel):
choice: Union[BluePill, RedPill] = Field(..., discriminator='type')
matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = TheMatrix.parse_obj(d)
assert isinstance(new_matrix.choice, RedPill)
It can also facilitate de-serializing objects when their type isn't known:
matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = BaseTypedModel.from_dict(d)
assert isinstance(new_matrix.choice, RedPill)
Source code in zenml/utils/typed_model.py
class BaseTypedModel(BaseModel, metaclass=BaseTypedModelMeta):
"""Typed Pydantic model base class.
Use this class as a base class instead of BaseModel to automatically
add a `type` literal attribute to the model that stores the name of the
class.
This can be useful when serializing models to JSON and then de-serializing
them as part of a submodel union field, e.g.:
```python
class BluePill(BaseTypedModel):
...
class RedPill(BaseTypedModel):
...
class TheMatrix(BaseTypedModel):
choice: Union[BluePill, RedPill] = Field(..., discriminator='type')
matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = TheMatrix.parse_obj(d)
assert isinstance(new_matrix.choice, RedPill)
```
It can also facilitate de-serializing objects when their type isn't known:
```python
matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = BaseTypedModel.from_dict(d)
assert isinstance(new_matrix.choice, RedPill)
```
"""
@classmethod
def from_dict(
cls,
model_dict: Dict[str, Any],
) -> "BaseTypedModel":
"""Instantiate a Pydantic model from a serialized JSON-able dict
representation.
Args:
model_dict: the model attributes serialized as JSON-able dict.
Returns:
A BaseTypedModel created from the serialized representation.
"""
model_type = model_dict.get("type")
if not model_type:
raise RuntimeError(
"`type` information is missing from the serialized model dict."
)
cls = load_source_path_class(model_type)
if not issubclass(cls, BaseTypedModel):
raise RuntimeError(
f"Class `{cls}` is not a ZenML BaseTypedModel subclass."
)
return cls.parse_obj(model_dict)
@classmethod
def from_json(
cls,
json_str: str,
) -> "BaseTypedModel":
"""Instantiate a Pydantic model from a serialized JSON representation.
Args:
json_str: the model attributes serialized as JSON.
Returns:
A BaseTypedModel created from the serialized representation.
"""
model_dict = json.loads(json_str)
return cls.from_dict(model_dict)
from_dict(model_dict)
classmethod
Instantiate a Pydantic model from a serialized JSON-able dict representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_dict |
Dict[str, Any] |
the model attributes serialized as JSON-able dict. |
required |
Returns:
Type | Description |
---|---|
BaseTypedModel |
A BaseTypedModel created from the serialized representation. |
Source code in zenml/utils/typed_model.py
@classmethod
def from_dict(
cls,
model_dict: Dict[str, Any],
) -> "BaseTypedModel":
"""Instantiate a Pydantic model from a serialized JSON-able dict
representation.
Args:
model_dict: the model attributes serialized as JSON-able dict.
Returns:
A BaseTypedModel created from the serialized representation.
"""
model_type = model_dict.get("type")
if not model_type:
raise RuntimeError(
"`type` information is missing from the serialized model dict."
)
cls = load_source_path_class(model_type)
if not issubclass(cls, BaseTypedModel):
raise RuntimeError(
f"Class `{cls}` is not a ZenML BaseTypedModel subclass."
)
return cls.parse_obj(model_dict)
from_json(json_str)
classmethod
Instantiate a Pydantic model from a serialized JSON representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_str |
str |
the model attributes serialized as JSON. |
required |
Returns:
Type | Description |
---|---|
BaseTypedModel |
A BaseTypedModel created from the serialized representation. |
Source code in zenml/utils/typed_model.py
@classmethod
def from_json(
cls,
json_str: str,
) -> "BaseTypedModel":
"""Instantiate a Pydantic model from a serialized JSON representation.
Args:
json_str: the model attributes serialized as JSON.
Returns:
A BaseTypedModel created from the serialized representation.
"""
model_dict = json.loads(json_str)
return cls.from_dict(model_dict)
BaseTypedModelMeta (ModelMetaclass)
Metaclass responsible for adding type information to Pydantic models.
Source code in zenml/utils/typed_model.py
class BaseTypedModelMeta(ModelMetaclass):
"""Metaclass responsible for adding type information to Pydantic models."""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseTypedModelMeta":
"""Creates a pydantic BaseModel class that includes a hidden attribute that
reflects the full class identifier."""
if "type" in dct:
raise TypeError(
"`type` is a reserved attribute name for BaseTypedModel "
"subclasses"
)
type_name = f"{dct['__module__']}.{dct['__qualname__']}"
type_ann = Literal[type_name] # type: ignore [misc,valid-type]
type = Field(type_name)
dct.setdefault("__annotations__", dict())["type"] = type_ann
dct["type"] = type
cls = cast(
Type["BaseTypedModel"], super().__new__(mcs, name, bases, dct)
)
return cls
__new__(mcs, name, bases, dct)
special
staticmethod
Creates a pydantic BaseModel class that includes a hidden attribute that reflects the full class identifier.
Source code in zenml/utils/typed_model.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseTypedModelMeta":
"""Creates a pydantic BaseModel class that includes a hidden attribute that
reflects the full class identifier."""
if "type" in dct:
raise TypeError(
"`type` is a reserved attribute name for BaseTypedModel "
"subclasses"
)
type_name = f"{dct['__module__']}.{dct['__qualname__']}"
type_ann = Literal[type_name] # type: ignore [misc,valid-type]
type = Field(type_name)
dct.setdefault("__annotations__", dict())["type"] = type_ann
dct["type"] = type
cls = cast(
Type["BaseTypedModel"], super().__new__(mcs, name, bases, dct)
)
return cls
yaml_utils
UUIDEncoder (JSONEncoder)
Source code in zenml/utils/yaml_utils.py
class UUIDEncoder(json.JSONEncoder):
def default(self, obj: Any) -> Any:
if isinstance(obj, UUID):
# if the obj is uuid, we simply return the value of uuid
return obj.hex
return json.JSONEncoder.default(self, obj)
default(self, obj)
Implement this method in a subclass such that it returns
a serializable object for o
, or calls the base implementation
(to raise a TypeError
).
For example, to support arbitrary iterators, you could implement default like this::
def default(self, o):
!!! try
iterable = iter(o)
except TypeError:
pass
!!! else
return list(iterable)
# Let the base class default method raise the TypeError
return JSONEncoder.default(self, o)
Source code in zenml/utils/yaml_utils.py
def default(self, obj: Any) -> Any:
if isinstance(obj, UUID):
# if the obj is uuid, we simply return the value of uuid
return obj.hex
return json.JSONEncoder.default(self, obj)
append_yaml(file_path, contents)
Append contents to a YAML file at file_path.
Source code in zenml/utils/yaml_utils.py
def append_yaml(file_path: str, contents: Dict[Any, Any]) -> None:
"""Append contents to a YAML file at file_path."""
file_contents = read_yaml(file_path) or {}
file_contents.update(contents)
if not utils.is_remote(file_path):
dir_ = str(Path(file_path).parent)
if not fileio.isdir(dir_):
raise FileNotFoundError(f"Directory {dir_} does not exist.")
utils.write_file_contents_as_string(file_path, yaml.dump(file_contents))
is_yaml(file_path)
Returns True if file_path is YAML, else False
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to YAML file. |
required |
Returns:
Type | Description |
---|---|
bool |
True if is yaml, else False. |
Source code in zenml/utils/yaml_utils.py
def is_yaml(file_path: str) -> bool:
"""Returns True if file_path is YAML, else False
Args:
file_path: Path to YAML file.
Returns:
True if is yaml, else False.
"""
if file_path.endswith("yaml") or file_path.endswith("yml"):
return True
return False
read_json(file_path)
Read JSON on file path and returns contents as dict.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to JSON file. |
required |
Source code in zenml/utils/yaml_utils.py
def read_json(file_path: str) -> Any:
"""Read JSON on file path and returns contents as dict.
Args:
file_path: Path to JSON file.
"""
if fileio.exists(file_path):
contents = utils.read_file_contents_as_string(file_path)
return json.loads(contents)
else:
raise FileNotFoundError(f"{file_path} does not exist.")
read_yaml(file_path)
Read YAML on file path and returns contents as dict.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to YAML file. |
required |
Returns:
Type | Description |
---|---|
Any |
Contents of the file in a dict. |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
if file does not exist. |
Source code in zenml/utils/yaml_utils.py
def read_yaml(file_path: str) -> Any:
"""Read YAML on file path and returns contents as dict.
Args:
file_path: Path to YAML file.
Returns:
Contents of the file in a dict.
Raises:
FileNotFoundError: if file does not exist.
"""
if fileio.exists(file_path):
contents = utils.read_file_contents_as_string(file_path)
# TODO: [LOW] consider adding a default empty dict to be returned
# instead of None
return yaml.safe_load(contents)
else:
raise FileNotFoundError(f"{file_path} does not exist.")
write_json(file_path, contents)
Write contents as JSON format to file_path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to JSON file. |
required |
contents |
Dict[str, Any] |
Contents of JSON file as dict. |
required |
Returns:
Type | Description |
---|---|
None |
Contents of the file in a dict. |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
if directory does not exist. |
Source code in zenml/utils/yaml_utils.py
def write_json(file_path: str, contents: Dict[str, Any]) -> None:
"""Write contents as JSON format to file_path.
Args:
file_path: Path to JSON file.
contents: Contents of JSON file as dict.
Returns:
Contents of the file in a dict.
Raises:
FileNotFoundError: if directory does not exist.
"""
if not utils.is_remote(file_path):
dir_ = str(Path(file_path).parent)
if not fileio.isdir(dir_):
# Check if it is a local path, if it doesn't exist, raise Exception.
raise FileNotFoundError(f"Directory {dir_} does not exist.")
utils.write_file_contents_as_string(file_path, json.dumps(contents))
write_yaml(file_path, contents)
Write contents as YAML format to file_path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
file_path |
str |
Path to YAML file. |
required |
contents |
Dict[Any, Any] |
Contents of YAML file as dict. |
required |
Exceptions:
Type | Description |
---|---|
FileNotFoundError |
if directory does not exist. |
Source code in zenml/utils/yaml_utils.py
def write_yaml(file_path: str, contents: Dict[Any, Any]) -> None:
"""Write contents as YAML format to file_path.
Args:
file_path: Path to YAML file.
contents: Contents of YAML file as dict.
Raises:
FileNotFoundError: if directory does not exist.
"""
if not utils.is_remote(file_path):
dir_ = str(Path(file_path).parent)
if not fileio.isdir(dir_):
raise FileNotFoundError(f"Directory {dir_} does not exist.")
utils.write_file_contents_as_string(file_path, yaml.dump(contents))