Skip to content

Utils

zenml.utils special

The utils module contains utility functions handling analytics, reading and writing YAML data as well as other general purpose functions.

analytics_utils

Analytics code for ZenML

AnalyticsEvent (str, Enum)

An enumeration.

Source code in zenml/utils/analytics_utils.py
class AnalyticsEvent(str, Enum):
    # Pipelines
    RUN_PIPELINE = "Pipeline run"
    GET_PIPELINES = "Pipelines fetched"
    GET_PIPELINE = "Pipeline fetched"

    # Repo
    INITIALIZE_REPO = "ZenML initialized"

    # Profile
    INITIALIZED_PROFILE = "Profile initialized"

    # Components
    REGISTERED_STACK_COMPONENT = "Stack component registered"
    UPDATED_STACK_COMPONENT = "Stack component updated"

    # Stack
    REGISTERED_STACK = "Stack registered"
    SET_STACK = "Stack set"
    UPDATED_STACK = "Stack updated"

    # Analytics opt in and out
    OPT_IN_ANALYTICS = "Analytics opt-in"
    OPT_OUT_ANALYTICS = "Analytics opt-out"

    # Examples
    RUN_EXAMPLE = "Example run"
    PULL_EXAMPLE = "Example pull"

    # Integrations
    INSTALL_INTEGRATION = "Integration installed"

    # Test event
    EVENT_TEST = "Test event"

get_environment()

Returns a string representing the execution environment of the pipeline. Currently, one of docker, paperspace, 'colab', or native

Source code in zenml/utils/analytics_utils.py
def get_environment() -> str:
    """Returns a string representing the execution environment of the pipeline.
    Currently, one of `docker`, `paperspace`, 'colab', or `native`"""
    if Environment.in_docker():
        return "docker"
    elif Environment.in_google_colab():
        return "colab"
    elif Environment.in_paperspace_gradient():
        return "paperspace"
    elif Environment.in_notebook():
        return "notebook"
    else:
        return "native"

get_segment_key()

Get key for authorizing to Segment backend.

Returns:

Type Description
str

Segment key as a string.

Source code in zenml/utils/analytics_utils.py
def get_segment_key() -> str:
    """Get key for authorizing to Segment backend.

    Returns:
        Segment key as a string.
    """
    if IS_DEBUG_ENV:
        return SEGMENT_KEY_DEV
    else:
        return SEGMENT_KEY_PROD

parametrized(dec)

This is a meta-decorator, that is, a decorator for decorators. As a decorator is a function, it actually works as a regular decorator with arguments:

Source code in zenml/utils/analytics_utils.py
def parametrized(
    dec: Callable[..., Callable[..., Any]]
) -> Callable[..., Callable[[Callable[..., Any]], Callable[..., Any]]]:
    """This is a meta-decorator, that is, a decorator for decorators.
    As a decorator is a function, it actually works as a regular decorator
    with arguments:"""

    def layer(
        *args: Any, **kwargs: Any
    ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
        """Internal layer"""

        def repl(f: Callable[..., Any]) -> Callable[..., Any]:
            """Internal repl"""
            return dec(f, *args, **kwargs)

        return repl

    return layer

track(*args, **kwargs)

Internal layer

Source code in zenml/utils/analytics_utils.py
def layer(
    *args: Any, **kwargs: Any
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Internal layer"""

    def repl(f: Callable[..., Any]) -> Callable[..., Any]:
        """Internal repl"""
        return dec(f, *args, **kwargs)

    return repl

track_event(event, metadata=None)

Track segment event if user opted-in.

Parameters:

Name Type Description Default
event Union[str, zenml.utils.analytics_utils.AnalyticsEvent]

Name of event to track in segment.

required
metadata Optional[Dict[str, Any]]

Dict of metadata to track.

None

Returns:

Type Description
bool

True if event is sent successfully, False is not.

Source code in zenml/utils/analytics_utils.py
def track_event(
    event: Union[str, AnalyticsEvent],
    metadata: Optional[Dict[str, Any]] = None,
) -> bool:
    """
    Track segment event if user opted-in.

    Args:
        event: Name of event to track in segment.
        metadata: Dict of metadata to track.

    Returns:
        True if event is sent successfully, False is not.
    """
    try:
        import analytics

        from zenml.config.global_config import GlobalConfiguration

        if analytics.write_key is None:
            analytics.write_key = get_segment_key()

        assert (
            analytics.write_key is not None
        ), "Analytics key not set but trying to make telemetry call."

        # Set this to 1 to avoid backoff loop
        analytics.max_retries = 1

        gc = GlobalConfiguration()
        if isinstance(event, AnalyticsEvent):
            event = event.value

        logger.debug(
            f"Attempting analytics: User: {gc.user_id}, "
            f"Event: {event},"
            f"Metadata: {metadata}"
        )

        if not gc.analytics_opt_in and event not in {
            AnalyticsEvent.OPT_OUT_ANALYTICS,
            AnalyticsEvent.OPT_IN_ANALYTICS,
        }:
            return False

        if metadata is None:
            metadata = {}

        # add basics
        metadata.update(Environment.get_system_info())
        metadata.update(
            {
                "environment": get_environment(),
                "python_version": Environment.python_version(),
                "version": __version__,
            }
        )

        analytics.track(str(gc.user_id), event, metadata)
        logger.debug(
            f"Analytics sent: User: {gc.user_id}, Event: {event}, Metadata: "
            f"{metadata}"
        )
        return True
    except Exception as e:
        # We should never fail main thread
        logger.debug(f"Analytics failed due to: {e}")
        return False

daemon

Utility functions to start/stop daemon processes.

This is only implemented for UNIX systems and therefore doesn't work on Windows. Based on https://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/

check_if_daemon_is_running(pid_file)

Checks whether a daemon process indicated by the PID file is running.

Parameters:

Name Type Description Default
pid_file str

Path to file containing the PID of the daemon process to check.

required
Source code in zenml/utils/daemon.py
def check_if_daemon_is_running(pid_file: str) -> bool:
    """Checks whether a daemon process indicated by the PID file is running.

    Args:
        pid_file: Path to file containing the PID of the daemon
            process to check.
    """
    return get_daemon_pid_if_running(pid_file) is not None

daemonize(pid_file, log_file=None, working_directory='/')

Decorator that executes the decorated function as a daemon process.

Use this decorator to easily transform any function into a daemon process.

Examples:

import time
from zenml.utils.daemonizer import daemonize


@daemonize(log_file='/tmp/daemon.log', pid_file='/tmp/daemon.pid')
def sleeping_daemon(period: int) -> None:
    print(f"I'm a daemon! I will sleep for {period} seconds.")
    time.sleep(period)
    print("Done sleeping, flying away.")

sleeping_daemon(period=30)

print("I'm the daemon's parent!.")
time.sleep(10) # just to prove that the daemon is running in parallel

Parameters:

Name Type Description Default
_func

decorated function

required
pid_file str

an optional file where the PID of the daemon process will be stored.

required
log_file Optional[str]

file where stdout and stderr are redirected for the daemon process. If not supplied, the daemon will be silenced (i.e. have its stdout/stderr redirected to /dev/null).

None
working_directory str

working directory for the daemon process, defaults to the root directory.

'/'

Returns:

Type Description
Callable[[~F], ~F]

Decorated function that, when called, will detach from the current process and continue executing in the background, as a daemon process.

Source code in zenml/utils/daemon.py
def daemonize(
    pid_file: str,
    log_file: Optional[str] = None,
    working_directory: str = "/",
) -> Callable[[F], F]:
    """Decorator that executes the decorated function as a daemon process.

    Use this decorator to easily transform any function into a daemon
    process.

    Example:

    ```python
    import time
    from zenml.utils.daemonizer import daemonize


    @daemonize(log_file='/tmp/daemon.log', pid_file='/tmp/daemon.pid')
    def sleeping_daemon(period: int) -> None:
        print(f"I'm a daemon! I will sleep for {period} seconds.")
        time.sleep(period)
        print("Done sleeping, flying away.")

    sleeping_daemon(period=30)

    print("I'm the daemon's parent!.")
    time.sleep(10) # just to prove that the daemon is running in parallel
    ```

    Args:
        _func: decorated function
        pid_file: an optional file where the PID of the daemon process will
            be stored.
        log_file: file where stdout and stderr are redirected for the daemon
            process. If not supplied, the daemon will be silenced (i.e. have
            its stdout/stderr redirected to /dev/null).
        working_directory: working directory for the daemon process,
            defaults to the root directory.
    Returns:
        Decorated function that, when called, will detach from the current
        process and continue executing in the background, as a daemon
        process.
    """

    def inner_decorator(_func: F) -> F:
        def daemon(*args: Any, **kwargs: Any) -> None:
            """Standard daemonization of a process."""
            # flake8: noqa: C901
            if sys.platform == "win32":
                logger.error(
                    "Daemon functionality is currently not supported on Windows."
                )
            else:
                run_as_daemon(
                    _func,
                    log_file=log_file,
                    pid_file=pid_file,
                    working_directory=working_directory,
                    *args,
                    **kwargs,
                )

        return cast(F, daemon)

    return inner_decorator

get_daemon_pid_if_running(pid_file)

Read and return the PID value from a PID file if the daemon process tracked by the PID file is running.

Parameters:

Name Type Description Default
pid_file str

Path to file containing the PID of the daemon process to check.

required

Returns:

Type Description
Optional[int]

The PID of the daemon process if it is running, otherwise None.

Source code in zenml/utils/daemon.py
def get_daemon_pid_if_running(pid_file: str) -> Optional[int]:
    """Read and return the PID value from a PID file if the daemon process
    tracked by the PID file is running.

    Args:
        pid_file: Path to file containing the PID of the daemon
            process to check.
    Returns:
        The PID of the daemon process if it is running, otherwise None.
    """
    try:
        with open(pid_file, "r") as f:
            pid = int(f.read().strip())
    except (IOError, FileNotFoundError):
        return None

    if not pid or not psutil.pid_exists(pid):
        return None

    return pid

run_as_daemon(daemon_function, *args, *, pid_file, log_file=None, working_directory='/', **kwargs)

Runs a function as a daemon process.

Parameters:

Name Type Description Default
daemon_function ~F

The function to run as a daemon.

required
pid_file str

Path to file in which to store the PID of the daemon process.

required
log_file Optional[str]

Optional file to which the daemons stdout/stderr will be redirected to.

None
working_directory str

Working directory for the daemon process, defaults to the root directory.

'/'
args Any

Positional arguments to pass to the daemon function.

()
kwargs Any

Keyword arguments to pass to the daemon function.

{}

Exceptions:

Type Description
FileExistsError

If the PID file already exists.

Source code in zenml/utils/daemon.py
def run_as_daemon(
    daemon_function: F,
    *args: Any,
    pid_file: str,
    log_file: Optional[str] = None,
    working_directory: str = "/",
    **kwargs: Any,
) -> None:
    """Runs a function as a daemon process.

    Args:
        daemon_function: The function to run as a daemon.
        pid_file: Path to file in which to store the PID of the daemon
            process.
        log_file: Optional file to which the daemons stdout/stderr will be
            redirected to.
        working_directory: Working directory for the daemon process,
            defaults to the root directory.
        args: Positional arguments to pass to the daemon function.
        kwargs: Keyword arguments to pass to the daemon function.
    Raises:
        FileExistsError: If the PID file already exists.
    """
    # convert to absolute path as we will change working directory later
    if pid_file:
        pid_file = os.path.abspath(pid_file)
    if log_file:
        log_file = os.path.abspath(log_file)

    # check if PID file exists
    if pid_file and os.path.exists(pid_file):
        raise FileExistsError(
            f"The PID file '{pid_file}' already exists, either the daemon "
            f"process is already running or something went wrong."
        )

    # first fork
    try:
        pid = os.fork()
        if pid > 0:
            # this is the process that called `run_as_daemon` so we
            # simply return so it can keep running
            return
    except OSError as e:
        logger.error("Unable to fork (error code: %d)", e.errno)
        sys.exit(1)

    # decouple from parent environment
    os.chdir(working_directory)
    os.setsid()
    os.umask(0o22)

    # second fork
    try:
        pid = os.fork()
        if pid > 0:
            # this is the parent of the future daemon process, kill it
            # so the daemon gets adopted by the init process
            sys.exit(0)
    except OSError as e:
        sys.stderr.write(f"Unable to fork (error code: {e.errno})")
        sys.exit(1)

    # redirect standard file descriptors to devnull (or the given logfile)
    devnull = "/dev/null"
    if hasattr(os, "devnull"):
        devnull = os.devnull

    devnull_fd = os.open(devnull, os.O_RDWR)
    log_fd = (
        os.open(log_file, os.O_CREAT | os.O_RDWR | os.O_APPEND)
        if log_file
        else None
    )
    out_fd = log_fd or devnull_fd

    os.dup2(devnull_fd, sys.stdin.fileno())
    os.dup2(out_fd, sys.stdout.fileno())
    os.dup2(out_fd, sys.stderr.fileno())

    if pid_file:
        # write the PID file
        with open(pid_file, "w+") as f:
            f.write(f"{os.getpid()}\n")

    # register actions in case this process exits/gets killed
    def cleanup() -> None:
        """Daemon cleanup."""
        terminate_children()
        if pid_file and os.path.exists(pid_file):
            os.remove(pid_file)

    def sighndl(signum: int, frame: Optional[types.FrameType]) -> None:
        """Daemon signal handler."""
        cleanup()

    signal.signal(signal.SIGTERM, sighndl)
    signal.signal(signal.SIGINT, sighndl)
    atexit.register(cleanup)

    # finally run the actual daemon code
    daemon_function(*args, **kwargs)
    sys.exit(0)

stop_daemon(pid_file)

Stops a daemon process.

Parameters:

Name Type Description Default
pid_file str

Path to file containing the PID of the daemon process to kill.

required
Source code in zenml/utils/daemon.py
def stop_daemon(pid_file: str) -> None:
    """Stops a daemon process.

    Args:
        pid_file: Path to file containing the PID of the daemon process to
            kill.
    """
    try:
        with open(pid_file, "r") as f:
            pid = int(f.read().strip())
    except (IOError, FileNotFoundError):
        logger.warning("Daemon PID file '%s' does not exist.", pid_file)
        return

    if psutil.pid_exists(pid):
        process = psutil.Process(pid)
        process.terminate()
    else:
        logger.warning("PID from '%s' does not exist.", pid_file)

terminate_children()

Terminate all processes that are children of the currently running process.

Source code in zenml/utils/daemon.py
def terminate_children() -> None:
    """Terminate all processes that are children of the currently running
    process.
    """
    pid = os.getpid()
    try:
        parent = psutil.Process(pid)
    except psutil.Error:
        # could not find parent process id
        return
    children = parent.children(recursive=False)

    for p in children:
        p.terminate()
    _, alive = psutil.wait_procs(
        children, timeout=CHILD_PROCESS_WAIT_TIMEOUT
    )
    for p in alive:
        p.kill()
    _, alive = psutil.wait_procs(
        children, timeout=CHILD_PROCESS_WAIT_TIMEOUT
    )

docker_utils

build_docker_image(build_context_path, image_name, entrypoint=None, dockerfile_path=None, dockerignore_path=None, requirements=None, environment_vars=None, use_local_requirements=False, base_image=None)

Builds a docker image.

Parameters:

Name Type Description Default
build_context_path str

Path to a directory that will be sent to the docker daemon as build context.

required
image_name str

The name to use for the created docker image.

required
entrypoint Optional[str]

Optional entrypoint command that gets executed when running a container of the built image.

None
dockerfile_path Optional[str]

Optional path to a dockerfile. If no value is given, a temporary dockerfile will be created.

None
dockerignore_path Optional[str]

Optional path to a dockerignore file. If no value is given, the .dockerignore in the root of the build context will be used if it exists. Otherwise, all files inside build_context_path are included in the build context.

None
requirements Optional[AbstractSet[str]]

Optional list of pip requirements to install. This will only be used if no value is given for dockerfile_path.

None
environment_vars Optional[Dict[str, str]]

Optional dict of key value pairs that need to be embedded as environment variables in the image.

None
use_local_requirements bool

If True and no values are given for dockerfile_path and requirements, then the packages installed in the environment of the current python processed will be installed in the docker image.

False
base_image Optional[str]

The image to use as base for the docker image.

None
Source code in zenml/utils/docker_utils.py
def build_docker_image(
    build_context_path: str,
    image_name: str,
    entrypoint: Optional[str] = None,
    dockerfile_path: Optional[str] = None,
    dockerignore_path: Optional[str] = None,
    requirements: Optional[AbstractSet[str]] = None,
    environment_vars: Optional[Dict[str, str]] = None,
    use_local_requirements: bool = False,
    base_image: Optional[str] = None,
) -> None:
    """Builds a docker image.

    Args:
        build_context_path: Path to a directory that will be sent to the
            docker daemon as build context.
        image_name: The name to use for the created docker image.
        entrypoint: Optional entrypoint command that gets executed when running
            a container of the built image.
        dockerfile_path: Optional path to a dockerfile. If no value is given,
            a temporary dockerfile will be created.
        dockerignore_path: Optional path to a dockerignore file. If no value is
            given, the .dockerignore in the root of the build context will be
            used if it exists. Otherwise, all files inside `build_context_path`
            are included in the build context.
        requirements: Optional list of pip requirements to install. This
            will only be used if no value is given for `dockerfile_path`.
        environment_vars: Optional dict of key value pairs that need to be
            embedded as environment variables in the image.
        use_local_requirements: If `True` and no values are given for
            `dockerfile_path` and `requirements`, then the packages installed
            in the environment of the current python processed will be
            installed in the docker image.
        base_image: The image to use as base for the docker image.
    """
    config_path = os.path.join(build_context_path, CONTAINER_ZENML_CONFIG_DIR)
    try:

        # Save a copy of the current global configuration with the
        # active profile and the active stack configuration into the build
        # context, to have the active profile and active stack accessible from
        # within the container.
        GlobalConfiguration().copy_active_configuration(
            config_path,
            load_config_path=f"/app/{CONTAINER_ZENML_CONFIG_DIR}",
        )

        if not requirements and use_local_requirements:
            local_requirements = get_current_environment_requirements()
            requirements = {
                f"{package}=={version}"
                for package, version in local_requirements.items()
                if package != "zenml"  # exclude ZenML
            }
            logger.info(
                "Using requirements from local environment to build "
                "docker image: %s",
                requirements,
            )

        if dockerfile_path:
            dockerfile_contents = read_file_contents_as_string(dockerfile_path)
        else:
            dockerfile_contents = generate_dockerfile_contents(
                base_image=base_image or DEFAULT_BASE_IMAGE,
                entrypoint=entrypoint,
                requirements=requirements,
                environment_vars=environment_vars,
            )

        build_context = create_custom_build_context(
            build_context_path=build_context_path,
            dockerfile_contents=dockerfile_contents,
            dockerignore_path=dockerignore_path,
        )
        # If a custom base image is provided, make sure to always pull the
        # latest version of that image (if it isn't a locally built image).
        # If no base image is provided, we use the static default ZenML image so
        # there is no need to constantly pull
        pull_base_image = False
        if base_image:
            pull_base_image = not is_local_image(base_image)

        logger.info(
            "Building docker image '%s', this might take a while...",
            image_name,
        )

        docker_client = DockerClient.from_env()
        # We use the client api directly here, so we can stream the logs
        output_stream = docker_client.images.client.api.build(
            fileobj=build_context,
            custom_context=True,
            tag=image_name,
            pull=pull_base_image,
            rm=False,  # don't remove intermediate containers
        )
        _process_stream(output_stream)
    finally:
        # Clean up the temporary build files
        fileio.rmtree(config_path)

    logger.info("Finished building docker image.")

create_custom_build_context(build_context_path, dockerfile_contents, dockerignore_path=None)

Creates a docker build context.

Parameters:

Name Type Description Default
build_context_path str

Path to a directory that will be sent to the docker daemon as build context.

required
dockerfile_contents str

File contents of the Dockerfile to use for the build.

required
dockerignore_path Optional[str]

Optional path to a dockerignore file. If no value is given, the .dockerignore in the root of the build context will be used if it exists. Otherwise, all files inside build_context_path are included in the build context.

None

Returns:

Type Description
Any

Docker build context that can be passed when building a docker image.

Source code in zenml/utils/docker_utils.py
def create_custom_build_context(
    build_context_path: str,
    dockerfile_contents: str,
    dockerignore_path: Optional[str] = None,
) -> Any:
    """Creates a docker build context.

    Args:
        build_context_path: Path to a directory that will be sent to the
            docker daemon as build context.
        dockerfile_contents: File contents of the Dockerfile to use for the
            build.
        dockerignore_path: Optional path to a dockerignore file. If no value is
            given, the .dockerignore in the root of the build context will be
            used if it exists. Otherwise, all files inside `build_context_path`
            are included in the build context.

    Returns:
        Docker build context that can be passed when building a docker image.
    """
    exclude_patterns = []
    default_dockerignore_path = os.path.join(
        build_context_path, ".dockerignore"
    )
    if dockerignore_path:
        exclude_patterns = _parse_dockerignore(dockerignore_path)
    elif fileio.exists(default_dockerignore_path):
        logger.info(
            "Using dockerignore found at path '%s' to create docker "
            "build context.",
            default_dockerignore_path,
        )
        exclude_patterns = _parse_dockerignore(default_dockerignore_path)
    else:
        logger.info(
            "No explicit dockerignore specified and no file called "
            ".dockerignore exists at the build context root (%s). "
            "Creating docker build context with all files inside the build "
            "context root directory.",
            build_context_path,
        )

    logger.debug(
        "Exclude patterns for creating docker build context: %s",
        exclude_patterns,
    )
    no_ignores_found = not exclude_patterns

    files = docker_build_utils.exclude_paths(
        build_context_path, patterns=exclude_patterns
    )
    extra_files = [("Dockerfile", dockerfile_contents)]
    context = docker_build_utils.create_archive(
        root=build_context_path,
        files=sorted(files),
        gzip=False,
        extra_files=extra_files,
    )

    build_context_size = os.path.getsize(context.name)
    if build_context_size > 50 * 1024 * 1024 and no_ignores_found:
        # The build context exceeds 50MiB and we didn't find any excludes
        # in dockerignore files -> remind to specify a .dockerignore file
        logger.warning(
            "Build context size for docker image: %s. If you believe this is "
            "unreasonably large, make sure to include a .dockerignore file at "
            "the root of your build context (%s) or specify a custom file "
            "when defining your pipeline.",
            string_utils.get_human_readable_filesize(build_context_size),
            default_dockerignore_path,
        )

    return context

generate_dockerfile_contents(base_image, entrypoint=None, requirements=None, environment_vars=None)

Generates a Dockerfile.

Parameters:

Name Type Description Default
base_image str

The image to use as base for the dockerfile.

required
entrypoint Optional[str]

The default entrypoint command that gets executed when running a container of an image created by this dockerfile.

None
requirements Optional[AbstractSet[str]]

Optional list of pip requirements to install.

None
environment_vars Optional[Dict[str, str]]

Optional dict of environment variables to set.

None

Returns:

Type Description
str

Content of a dockerfile.

Source code in zenml/utils/docker_utils.py
def generate_dockerfile_contents(
    base_image: str,
    entrypoint: Optional[str] = None,
    requirements: Optional[AbstractSet[str]] = None,
    environment_vars: Optional[Dict[str, str]] = None,
) -> str:
    """Generates a Dockerfile.

    Args:
        base_image: The image to use as base for the dockerfile.
        entrypoint: The default entrypoint command that gets executed when
            running a container of an image created by this dockerfile.
        requirements: Optional list of pip requirements to install.
        environment_vars: Optional dict of environment variables to set.

    Returns:
        Content of a dockerfile.
    """
    lines = [f"FROM {base_image}", "WORKDIR /app"]

    # TODO [ENG-781]: Make secrets invisible in the Dockerfile or use a different approach.
    if environment_vars:
        for key, value in environment_vars.items():
            lines.append(f"ENV {key.upper()}={value}")

    if requirements:
        lines.append(
            f"RUN pip install --no-cache {' '.join(sorted(requirements))}"
        )

    lines.append("COPY . .")
    lines.append("RUN chmod -R a+rw .")
    lines.append(
        f"ENV {ENV_ZENML_CONFIG_PATH}=/app/{CONTAINER_ZENML_CONFIG_DIR}"
    )

    if entrypoint:
        lines.append(f"ENTRYPOINT {entrypoint}")

    return "\n".join(lines)

get_current_environment_requirements()

Returns a dict of package requirements for the environment that the current python process is running in.

Source code in zenml/utils/docker_utils.py
def get_current_environment_requirements() -> Dict[str, str]:
    """Returns a dict of package requirements for the environment that
    the current python process is running in."""
    return {
        distribution.key: distribution.version
        for distribution in pkg_resources.working_set
    }

get_image_digest(image_name)

Gets the digest of a docker image.

Parameters:

Name Type Description Default
image_name str

Name of the image to get the digest for.

required

Returns:

Type Description
Optional[str]

Returns the repo digest for the given image if there exists exactly one. If there are zero or multiple repo digests, returns None.

Source code in zenml/utils/docker_utils.py
def get_image_digest(image_name: str) -> Optional[str]:
    """Gets the digest of a docker image.

    Args:
        image_name: Name of the image to get the digest for.

    Returns:
        Returns the repo digest for the given image if there exists exactly one.
        If there are zero or multiple repo digests, returns `None`.
    """
    docker_client = DockerClient.from_env()
    image = docker_client.images.get(image_name)
    repo_digests = image.attrs["RepoDigests"]
    if len(repo_digests) == 1:
        return cast(str, repo_digests[0])
    else:
        logger.debug(
            "Found zero or more repo digests for docker image '%s': %s",
            image_name,
            repo_digests,
        )
        return None

is_local_image(image_name)

Returns whether an image was pulled from a registry or not.

Source code in zenml/utils/docker_utils.py
def is_local_image(image_name: str) -> bool:
    """Returns whether an image was pulled from a registry or not."""
    docker_client = DockerClient.from_env()
    images = docker_client.images.list(name=image_name)
    if images:
        # An image with this name is available locally -> now check whether it
        # was pulled from a repo or built locally (in which case the repo
        # digest is empty)
        return get_image_digest(image_name) is None
    else:
        # no image with this name found locally
        return False

push_docker_image(image_name)

Pushes a docker image to a container registry.

Parameters:

Name Type Description Default
image_name str

The full name (including a tag) of the image to push.

required
Source code in zenml/utils/docker_utils.py
def push_docker_image(image_name: str) -> None:
    """Pushes a docker image to a container registry.

    Args:
        image_name: The full name (including a tag) of the image to push.
    """
    logger.info("Pushing docker image '%s'.", image_name)
    docker_client = DockerClient.from_env()
    output_stream = docker_client.images.push(image_name, stream=True)
    _process_stream(output_stream)
    logger.info("Finished pushing docker image.")

enum_utils

StrEnum (str, Enum)

Base enum type for string enum values

Source code in zenml/utils/enum_utils.py
class StrEnum(str, Enum):
    """Base enum type for string enum values"""

    def __str__(self) -> str:
        """Returns the enum string value."""
        return self.value  # type: ignore

    @classmethod
    def names(cls) -> List[str]:
        """Get all enum names as a list of strings"""
        return [c.name for c in cls]

    @classmethod
    def values(cls) -> List[str]:
        """Get all enum values as a list of strings"""
        return [c.value for c in cls]

networking_utils

find_available_port()

Finds a local random unoccupied TCP port.

Source code in zenml/utils/networking_utils.py
def find_available_port() -> int:
    """Finds a local random unoccupied TCP port."""
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("127.0.0.1", 0))
        _, port = s.getsockname()

    return cast(int, port)

port_available(port)

Checks if a local port is available.

Parameters:

Name Type Description Default
port int

TCP port number

required

Returns:

Type Description
bool

True if the port is available, otherwise False

Source code in zenml/utils/networking_utils.py
def port_available(port: int) -> bool:
    """Checks if a local port is available.

    Args:
        port: TCP port number

    Returns:
        True if the port is available, otherwise False
    """
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.bind(("127.0.0.1", port))
    except socket.error as e:
        logger.debug("Port %d unavailable: %s", port, e)
        return False

    return True

port_is_open(hostname, port)

Check if a TCP port is open on a remote host.

Parameters:

Name Type Description Default
hostname str

hostname of the remote machine

required
port int

TCP port number

required

Returns:

Type Description
bool

True if the port is open, False otherwise

Source code in zenml/utils/networking_utils.py
def port_is_open(hostname: str, port: int) -> bool:
    """Check if a TCP port is open on a remote host.

    Args:
        hostname: hostname of the remote machine
        port: TCP port number

    Returns:
        True if the port is open, False otherwise
    """
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            result = sock.connect_ex((hostname, port))
            return result == 0
    except socket.error as e:
        logger.debug(
            f"Error checking TCP port {port} on host {hostname}: {str(e)}"
        )
        return False

scan_for_available_port(start=8000, stop=65535)

Scan the local network for an available port in the given range.

Parameters:

Name Type Description Default
start int

the beginning of the port range value to scan

8000
stop int

the (inclusive) end of the port range value to scan

65535

Returns:

Type Description
Optional[int]

The first available port in the given range, or None if no available port is found.

Source code in zenml/utils/networking_utils.py
def scan_for_available_port(
    start: int = SCAN_PORT_RANGE[0], stop: int = SCAN_PORT_RANGE[1]
) -> Optional[int]:
    """Scan the local network for an available port in the given range.

    Args:
        start: the beginning of the port range value to scan
        stop: the (inclusive) end of the port range value to scan
    Returns:
        The first available port in the given range, or None if no available
        port is found.
    """
    for port in range(start, stop + 1):
        if port_available(port):
            return port
    logger.debug(
        "No free TCP ports found in the range %d - %d",
        start,
        stop,
    )
    return None

secrets_manager_utils

decode_secret_dict(secret_dict)

Base64 decode a Secret.

Parameters:

Name Type Description Default
secret_dict Dict[str, str]

dict containing key-value pairs to decode

required

Returns:

Type Description
Tuple[Dict[str, str], str]

Decoded secret Dict containing key-value pairs

Source code in zenml/utils/secrets_manager_utils.py
def decode_secret_dict(
    secret_dict: Dict[str, str]
) -> Tuple[Dict[str, str], str]:
    """Base64 decode a Secret.

    Args:
        secret_dict: dict containing key-value pairs to decode

    Returns:
        Decoded secret Dict containing key-value pairs
    """
    zenml_schema_name = secret_dict.pop(ZENML_SCHEMA_NAME)

    decoded_secret = {k: decode_string(v) for k, v in secret_dict.items()}
    return decoded_secret, zenml_schema_name

decode_string(string)

Base64 decode a string.

Parameters:

Name Type Description Default
string str

String to decode

required

Returns:

Type Description
str

Decoded string

Source code in zenml/utils/secrets_manager_utils.py
def decode_string(string: str) -> str:
    """Base64 decode a string.

    Args:
        string: String to decode

    Returns:
        Decoded string
    """
    decoded_bytes = base64.b64decode(string)
    return str(decoded_bytes, "utf-8")

encode_secret(secret)

Base64 encode all values within a secret.

Parameters:

Name Type Description Default
secret BaseSecretSchema

Secret containing key-value pairs

required

Returns:

Type Description
Dict[str, str]

Encoded secret Dict containing key-value pairs

Source code in zenml/utils/secrets_manager_utils.py
def encode_secret(secret: BaseSecretSchema) -> Dict[str, str]:
    """Base64 encode all values within a secret.

    Args:
        secret: Secret containing key-value pairs

    Returns:
        Encoded secret Dict containing key-value pairs
    """
    encoded_secret = {
        k: encode_string(str(v))
        for k, v in secret.content.items()
        if v is not None
    }
    encoded_secret[ZENML_SCHEMA_NAME] = secret.TYPE
    return encoded_secret

encode_string(string)

Base64 encode a string.

Parameters:

Name Type Description Default
string str

String to encode

required

Returns:

Type Description
str

Encoded string

Source code in zenml/utils/secrets_manager_utils.py
def encode_string(string: str) -> str:
    """Base64 encode a string.

    Args:
        string: String to encode

    Returns:
        Encoded string
    """
    encoded_bytes = base64.b64encode(string.encode("utf-8"))
    return str(encoded_bytes, "utf-8")

singleton

SingletonMetaClass (type)

Singleton metaclass.

Use this metaclass to make any class into a singleton class:

class OneRing(metaclass=SingletonMetaClass):
    def __init__(self, owner):
        self._owner = owner

    @property
    def owner(self):
        return self._owner

the_one_ring = OneRing('Sauron')
the_lost_ring = OneRing('Frodo')
print(the_lost_ring.owner)  # Sauron
OneRing._clear() # ring destroyed
Source code in zenml/utils/singleton.py
class SingletonMetaClass(type):
    """Singleton metaclass.

    Use this metaclass to make any class into a singleton class:

    ```python
    class OneRing(metaclass=SingletonMetaClass):
        def __init__(self, owner):
            self._owner = owner

        @property
        def owner(self):
            return self._owner

    the_one_ring = OneRing('Sauron')
    the_lost_ring = OneRing('Frodo')
    print(the_lost_ring.owner)  # Sauron
    OneRing._clear() # ring destroyed
    ```
    """

    def __init__(cls, *args: Any, **kwargs: Any) -> None:
        """Initialize a singleton class."""
        super().__init__(*args, **kwargs)
        cls.__singleton_instance: Optional["SingletonMetaClass"] = None

    def __call__(cls, *args: Any, **kwargs: Any) -> "SingletonMetaClass":
        """Create or return the singleton instance."""
        if not cls.__singleton_instance:
            cls.__singleton_instance = cast(
                "SingletonMetaClass", super().__call__(*args, **kwargs)
            )

        return cls.__singleton_instance

    def _clear(cls) -> None:
        """Clear the singleton instance."""
        cls.__singleton_instance = None
__call__(cls, *args, **kwargs) special

Create or return the singleton instance.

Source code in zenml/utils/singleton.py
def __call__(cls, *args: Any, **kwargs: Any) -> "SingletonMetaClass":
    """Create or return the singleton instance."""
    if not cls.__singleton_instance:
        cls.__singleton_instance = cast(
            "SingletonMetaClass", super().__call__(*args, **kwargs)
        )

    return cls.__singleton_instance
__init__(cls, *args, **kwargs) special

Initialize a singleton class.

Source code in zenml/utils/singleton.py
def __init__(cls, *args: Any, **kwargs: Any) -> None:
    """Initialize a singleton class."""
    super().__init__(*args, **kwargs)
    cls.__singleton_instance: Optional["SingletonMetaClass"] = None

source_utils

These utils are predicated on the following definitions:

  • class_source: This is a python-import type path to a class, e.g. some.mod.class
  • module_source: This is a python-import type path to a module, e.g. some.mod
  • file_path, relative_path, absolute_path: These are file system paths.
  • source: This is a class_source or module_source. If it is a class_source, it can also be optionally pinned.
  • pin: Whatever comes after the @ symbol from a source, usually the git sha or the version of zenml as a string.

create_zenml_pin()

Creates a ZenML pin for source pinning from release version.

Source code in zenml/utils/source_utils.py
def create_zenml_pin() -> str:
    """Creates a ZenML pin for source pinning from release version."""
    return f"{APP_NAME}_{__version__}"

get_absolute_path_from_module_source(module)

Get a directory path from module source.

E.g. zenml.core.step will return full/path/to/zenml/core/step.

Parameters:

Name Type Description Default
module str

A module e.g. zenml.core.step.

required
Source code in zenml/utils/source_utils.py
def get_absolute_path_from_module_source(module: str) -> str:
    """Get a directory path from module source.

    E.g. `zenml.core.step` will return `full/path/to/zenml/core/step`.

    Args:
        module: A module e.g. `zenml.core.step`.
    """
    mod = importlib.import_module(module)
    return mod.__path__[0]

get_class_source_from_source(source)

Gets class source from source, i.e. module.path@version, returns version.

Parameters:

Name Type Description Default
source str

source pointing to potentially pinned sha.

required
Source code in zenml/utils/source_utils.py
def get_class_source_from_source(source: str) -> str:
    """Gets class source from source, i.e. module.path@version, returns version.

    Args:
        source: source pointing to potentially pinned sha.
    """
    # source need not even be pinned
    return source.split("@")[0]

get_hashed_source(value)

Returns a hash of the objects source code.

Source code in zenml/utils/source_utils.py
def get_hashed_source(value: Any) -> str:
    """Returns a hash of the objects source code."""
    try:
        source_code = get_source(value)
    except TypeError:
        raise TypeError(
            f"Unable to compute the hash of source code of object: {value}."
        )
    return hashlib.sha256(source_code.encode("utf-8")).hexdigest()

get_module_source_from_class(class_)

Takes class input and returns module_source. If class is already string then returns the same.

Parameters:

Name Type Description Default
class_ Union[Type[Any], str]

object of type class.

required
Source code in zenml/utils/source_utils.py
def get_module_source_from_class(
    class_: Union[Type[Any], str]
) -> Optional[str]:
    """Takes class input and returns module_source. If class is already string
    then returns the same.

    Args:
        class_: object of type class.
    """
    if isinstance(class_, str):
        module_source = class_
    else:
        # Infer it from the class provided
        if not inspect.isclass(class_):
            raise AssertionError("step_type is neither string nor class.")
        module_source = class_.__module__ + "." + class_.__name__
    return module_source

get_module_source_from_module(module)

Gets the source of the supplied module.

E.g.:

  • a /home/myrepo/src/run.py module running as the main module returns run if no repository root is specified.

  • a /home/myrepo/src/run.py module running as the main module returns src.run if the repository root is configured in /home/myrepo

  • a /home/myrepo/src/pipeline.py module not running as the main module returns src.pipeline if the repository root is configured in /home/myrepo

  • a /home/myrepo/src/pipeline.py module not running as the main module returns pipeline if no repository root is specified and the main module is also in /home/myrepo/src.

  • a /home/step.py module not running as the main module returns step if the CWD is /home and the repository root or the main module are in a different path (e.g. /home/myrepo/src).

Parameters:

Name Type Description Default
module module

the module to get the source of.

required

Returns:

Type Description
str

The source of the main module.

Exceptions:

Type Description
RuntimeError

if the module is not loaded from a file

Source code in zenml/utils/source_utils.py
def get_module_source_from_module(module: ModuleType) -> str:
    """Gets the source of the supplied module.

    E.g.:

      * a `/home/myrepo/src/run.py` module running as the main module returns
      `run` if no repository root is specified.

      * a `/home/myrepo/src/run.py` module running as the main module returns
      `src.run` if the repository root is configured in `/home/myrepo`

      * a `/home/myrepo/src/pipeline.py` module not running as the main module
      returns `src.pipeline` if the repository root is configured in
      `/home/myrepo`

      * a `/home/myrepo/src/pipeline.py` module not running as the main module
      returns `pipeline` if no repository root is specified and the main
      module is also in `/home/myrepo/src`.

      * a `/home/step.py` module not running as the main module
      returns `step` if the CWD is /home and the repository root or the main
      module are in a different path (e.g. `/home/myrepo/src`).

    Args:
        module: the module to get the source of.

    Returns:
        The source of the main module.

    Raises:
        RuntimeError: if the module is not loaded from a file
    """
    if not hasattr(module, "__file__") or not module.__file__:
        if module.__name__ == "__main__":
            raise RuntimeError(
                f"{module} module was not loaded from a file. Cannot "
                "determine the module root path."
            )
        return module.__name__
    module_path = os.path.abspath(module.__file__)

    root_path = get_source_root_path()

    if not module_path.startswith(root_path):
        root_path = os.getcwd()
        logger.warning(
            "User module %s is not in the source root. Using current "
            "directory %s instead to resolve module source.",
            module,
            root_path,
        )

    # Remove root_path from module_path to get relative path left over
    module_path = module_path.replace(root_path, "")[1:]

    # Kick out the .py and replace `/` with `.` to get the module source
    module_path = module_path.replace(".py", "")
    module_source = module_path.replace("/", ".")

    logger.debug(
        f"Resolved module source for module {module} to: {module_source}"
    )

    return module_source

get_module_source_from_source(source)

Gets module source from source. E.g. some.module.file.class@version, returns some.module.

Parameters:

Name Type Description Default
source str

source pointing to potentially pinned sha.

required
Source code in zenml/utils/source_utils.py
def get_module_source_from_source(source: str) -> str:
    """Gets module source from source. E.g. `some.module.file.class@version`,
    returns `some.module`.

    Args:
        source: source pointing to potentially pinned sha.
    """
    class_source = get_class_source_from_source(source)
    return ".".join(class_source.split(".")[:-2])

get_relative_path_from_module_source(module_source)

Get a directory path from module, relative to root of the package tree.

E.g. zenml.core.step will return zenml/core/step.

Parameters:

Name Type Description Default
module_source str

A module e.g. zenml.core.step

required
Source code in zenml/utils/source_utils.py
def get_relative_path_from_module_source(module_source: str) -> str:
    """Get a directory path from module, relative to root of the package tree.

    E.g. zenml.core.step will return zenml/core/step.

    Args:
        module_source: A module e.g. zenml.core.step
    """
    return module_source.replace(".", "/")

get_source(value)

Returns the source code of an object. If executing within a IPython kernel environment, then this monkey-patches inspect module temporarily with a workaround to get source from the cell.

Exceptions:

Type Description
TypeError

If source not found.

Source code in zenml/utils/source_utils.py
def get_source(value: Any) -> str:
    """Returns the source code of an object. If executing within a IPython
    kernel environment, then this monkey-patches `inspect` module temporarily
    with a workaround to get source from the cell.

    Raises:
        TypeError: If source not found.
    """
    if Environment.in_notebook():
        # Monkey patch inspect.getfile temporarily to make getsource work.
        # Source: https://stackoverflow.com/questions/51566497/
        def _new_getfile(
            object: Any,
            _old_getfile: Callable[
                [
                    Union[
                        ModuleType,
                        Type[Any],
                        MethodType,
                        FunctionType,
                        TracebackType,
                        FrameType,
                        CodeType,
                        Callable[..., Any],
                    ]
                ],
                str,
            ] = inspect.getfile,
        ) -> Any:
            if not inspect.isclass(object):
                return _old_getfile(object)

            # Lookup by parent module (as in current inspect)
            if hasattr(object, "__module__"):
                object_ = sys.modules.get(object.__module__)
                if hasattr(object_, "__file__"):
                    return object_.__file__  # type: ignore[union-attr]

            # If parent module is __main__, lookup by methods
            for name, member in inspect.getmembers(object):
                if (
                    inspect.isfunction(member)
                    and object.__qualname__ + "." + member.__name__
                    == member.__qualname__
                ):
                    return inspect.getfile(member)
            else:
                raise TypeError(f"Source for {object!r} not found.")

        # Monkey patch, compute source, then revert monkey patch.
        _old_getfile = inspect.getfile
        inspect.getfile = _new_getfile
        try:
            src = inspect.getsource(value)
        finally:
            inspect.getfile = _old_getfile
    else:
        # Use standard inspect if running outside a notebook
        src = inspect.getsource(value)
    return src

get_source_root_path()

Get the repository root path or the source root path of the current process.

E.g.:

  • if the process was started by running a run.py file under full/path/to/my/run.py, and the repository root is configured at full/path, the source root path is full/path.

  • same case as above, but when there is no repository root configured, the source root path is full/path/to/my.

Returns:

Type Description
str

The source root path of the current process.

Source code in zenml/utils/source_utils.py
def get_source_root_path() -> str:
    """Get the repository root path or the source root path of the current
    process.

    E.g.:

      * if the process was started by running a `run.py` file under
      `full/path/to/my/run.py`, and the repository root is configured at
      `full/path`, the source root path is `full/path`.

      * same case as above, but when there is no repository root configured,
      the source root path is `full/path/to/my`.

    Returns:
        The source root path of the current process.
    """
    from zenml.repository import Repository

    repo_root = Repository.find_repository()
    if repo_root:
        logger.debug("Using repository root as source root: %s", repo_root)
        return str(repo_root.resolve())

    main_module = sys.modules.get("__main__")
    if main_module is None:
        raise RuntimeError(
            "Could not determine the main module used to run the current "
            "process."
        )

    if not hasattr(main_module, "__file__") or not main_module.__file__:
        raise RuntimeError(
            "Main module was not started from a file. Cannot "
            "determine the module root path."
        )
    path = pathlib.Path(main_module.__file__).resolve().parent

    logger.debug("Using main module location as source root: %s", path)
    return str(path)

import_class_by_path(class_path)

Imports a class based on a given path

Parameters:

Name Type Description Default
class_path str

str, class_source e.g. this.module.Class

required

Returns: the given class

Source code in zenml/utils/source_utils.py
def import_class_by_path(class_path: str) -> Type[Any]:
    """Imports a class based on a given path

    Args:
        class_path: str, class_source e.g. this.module.Class

    Returns: the given class
    """
    classname = class_path.split(".")[-1]
    modulename = ".".join(class_path.split(".")[0:-1])
    mod = importlib.import_module(modulename)
    return getattr(mod, classname)  # type: ignore[no-any-return]

import_python_file(file_path)

Imports a python file.

Parameters:

Name Type Description Default
file_path str

Path to python file that should be imported.

required

Returns:

Type Description
module

The imported module.

Source code in zenml/utils/source_utils.py
def import_python_file(file_path: str) -> types.ModuleType:
    """Imports a python file.

    Args:
        file_path: Path to python file that should be imported.

    Returns:
        The imported module.
    """
    # Add directory of python file to PYTHONPATH so we can import it
    file_path = os.path.abspath(file_path)
    sys.path.append(os.path.dirname(file_path))

    module_name = os.path.splitext(os.path.basename(file_path))[0]
    return importlib.import_module(module_name)

is_inside_repository(file_path)

Returns whether a file is inside a zenml repository.

Source code in zenml/utils/source_utils.py
def is_inside_repository(file_path: str) -> bool:
    """Returns whether a file is inside a zenml repository."""
    from zenml.repository import Repository

    repo_path = Repository.find_repository()
    if not repo_path:
        return False

    repo_path = repo_path.resolve()
    absolute_file_path = pathlib.Path(file_path).resolve()
    return repo_path in absolute_file_path.parents

is_standard_pin(pin)

Returns True if pin is valid ZenML pin, else False.

Parameters:

Name Type Description Default
pin str

potential ZenML pin like 'zenml_0.1.1'

required
Source code in zenml/utils/source_utils.py
def is_standard_pin(pin: str) -> bool:
    """Returns `True` if pin is valid ZenML pin, else False.

    Args:
        pin: potential ZenML pin like 'zenml_0.1.1'
    """
    if pin.startswith(f"{APP_NAME}_"):
        return True
    return False

is_standard_source(source)

Returns True if source is a standard ZenML source.

Parameters:

Name Type Description Default
source str

class_source e.g. this.module.Class[@pin].

required
Source code in zenml/utils/source_utils.py
def is_standard_source(source: str) -> bool:
    """Returns `True` if source is a standard ZenML source.

    Args:
        source: class_source e.g. this.module.Class[@pin].
    """
    if source.split(".")[0] == "zenml":
        return True
    return False

is_third_party_module(file_path)

Returns whether a file belongs to a third party package.

Source code in zenml/utils/source_utils.py
def is_third_party_module(file_path: str) -> bool:
    """Returns whether a file belongs to a third party package."""
    absolute_file_path = pathlib.Path(file_path).resolve()

    for path in site.getsitepackages() + [site.getusersitepackages()]:
        if pathlib.Path(path).resolve() in absolute_file_path.parents:
            return True

    return False

load_source_path_class(source, import_path=None)

Loads a Python class from the source.

Parameters:

Name Type Description Default
source str

class_source e.g. this.module.Class[@sha]

required
import_path Optional[str]

optional path to add to python path

None
Source code in zenml/utils/source_utils.py
def load_source_path_class(
    source: str, import_path: Optional[str] = None
) -> Type[Any]:
    """Loads a Python class from the source.

    Args:
        source: class_source e.g. this.module.Class[@sha]
        import_path: optional path to add to python path
    """
    from zenml.repository import Repository

    repo_root = Repository.find_repository()
    if not import_path and repo_root:
        import_path = str(repo_root)

    if "@" in source:
        source = source.split("@")[0]

    if import_path is not None:
        with prepend_python_path(import_path):
            logger.debug(
                f"Loading class {source} with import path {import_path}"
            )
            return import_class_by_path(source)
    return import_class_by_path(source)

prepend_python_path(path)

Simple context manager to help import module within the repo

Source code in zenml/utils/source_utils.py
@contextmanager
def prepend_python_path(path: str) -> Iterator[None]:
    """Simple context manager to help import module within the repo"""
    try:
        # Entering the with statement
        sys.path.insert(0, path)
        yield
    finally:
        # Exiting the with statement
        sys.path.remove(path)

resolve_class(class_)

Resolves a class into a serializable source string.

For classes that are not built-in nor imported from a Python package, the get_source_root_path function is used to determine the root path relative to which the class source is resolved.

Parameters:

Name Type Description Default
class_ Type[Any]

A Python Class reference.

required

Returns: source_path e.g. this.module.Class.

Source code in zenml/utils/source_utils.py
def resolve_class(class_: Type[Any]) -> str:
    """Resolves a class into a serializable source string.

    For classes that are not built-in nor imported from a Python package, the
    `get_source_root_path` function is used to determine the root path
    relative to which the class source is resolved.

    Args:
        class_: A Python Class reference.

    Returns: source_path e.g. this.module.Class.
    """
    initial_source = class_.__module__ + "." + class_.__name__
    if is_standard_source(initial_source):
        return resolve_standard_source(initial_source)

    try:
        file_path = inspect.getfile(class_)
    except TypeError:
        # builtin file
        return initial_source

    if initial_source.startswith("__main__") or is_third_party_module(
        file_path
    ):
        return initial_source

    # Regular user file -> get the full module path relative to the
    # source root.
    module_source = get_module_source_from_module(
        sys.modules[class_.__module__]
    )

    # ENG-123 Sanitize for Windows OS
    # module_source = module_source.replace("\\", ".")

    logger.debug(f"Resolved class {class_} to {module_source}")

    return module_source + "." + class_.__name__

resolve_standard_source(source)

Creates a ZenML pin for source pinning from release version.

Parameters:

Name Type Description Default
source str

class_source e.g. this.module.Class.

required
Source code in zenml/utils/source_utils.py
def resolve_standard_source(source: str) -> str:
    """Creates a ZenML pin for source pinning from release version.

    Args:
        source: class_source e.g. this.module.Class.
    """
    if "@" in source:
        raise AssertionError(f"source {source} is already pinned.")
    pin = create_zenml_pin()
    return f"{source}@{pin}"

string_utils

get_human_readable_filesize(bytes_)

Convert a file size in bytes into a human-readable string.

Source code in zenml/utils/string_utils.py
def get_human_readable_filesize(bytes_: int) -> str:
    """Convert a file size in bytes into a human-readable string."""
    size = abs(float(bytes_))
    for unit in ["B", "KiB", "MiB", "GiB"]:
        if size < 1024.0 or unit == "GiB":
            break
        size /= 1024.0

    return f"{size:.2f} {unit}"

get_human_readable_time(seconds)

Convert seconds into a human-readable string.

Source code in zenml/utils/string_utils.py
def get_human_readable_time(seconds: float) -> str:
    """Convert seconds into a human-readable string."""
    prefix = "-" if seconds < 0 else ""
    seconds = abs(seconds)
    int_seconds = int(seconds)
    days, int_seconds = divmod(int_seconds, 86400)
    hours, int_seconds = divmod(int_seconds, 3600)
    minutes, int_seconds = divmod(int_seconds, 60)
    if days > 0:
        time_string = f"{days}d{hours}h{minutes}m{int_seconds}s"
    elif hours > 0:
        time_string = f"{hours}h{minutes}m{int_seconds}s"
    elif minutes > 0:
        time_string = f"{minutes}m{int_seconds}s"
    else:
        time_string = f"{seconds:.3f}s"

    return prefix + time_string

typed_model

BaseTypedModel (BaseModel) pydantic-model

Typed Pydantic model base class.

Use this class as a base class instead of BaseModel to automatically add a type literal attribute to the model that stores the name of the class.

This can be useful when serializing models to JSON and then de-serializing them as part of a submodel union field, e.g.:


class BluePill(BaseTypedModel):
    ...

class RedPill(BaseTypedModel):
    ...

class TheMatrix(BaseTypedModel):
    choice: Union[BluePill, RedPill] = Field(..., discriminator='type')

matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = TheMatrix.parse_obj(d)
assert isinstance(new_matrix.choice, RedPill)

It can also facilitate de-serializing objects when their type isn't known:

matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = BaseTypedModel.from_dict(d)
assert isinstance(new_matrix.choice, RedPill)
Source code in zenml/utils/typed_model.py
class BaseTypedModel(BaseModel, metaclass=BaseTypedModelMeta):
    """Typed Pydantic model base class.

    Use this class as a base class instead of BaseModel to automatically
    add a `type` literal attribute to the model that stores the name of the
    class.

    This can be useful when serializing models to JSON and then de-serializing
    them as part of a submodel union field, e.g.:

    ```python

    class BluePill(BaseTypedModel):
        ...

    class RedPill(BaseTypedModel):
        ...

    class TheMatrix(BaseTypedModel):
        choice: Union[BluePill, RedPill] = Field(..., discriminator='type')

    matrix = TheMatrix(choice=RedPill())
    d = matrix.dict()
    new_matrix = TheMatrix.parse_obj(d)
    assert isinstance(new_matrix.choice, RedPill)
    ```

    It can also facilitate de-serializing objects when their type isn't known:

    ```python
    matrix = TheMatrix(choice=RedPill())
    d = matrix.dict()
    new_matrix = BaseTypedModel.from_dict(d)
    assert isinstance(new_matrix.choice, RedPill)
    ```
    """

    @classmethod
    def from_dict(
        cls,
        model_dict: Dict[str, Any],
    ) -> "BaseTypedModel":
        """Instantiate a Pydantic model from a serialized JSON-able dict
        representation.

        Args:
            model_dict: the model attributes serialized as JSON-able dict.

        Returns:
            A BaseTypedModel created from the serialized representation.
        """
        model_type = model_dict.get("type")
        if not model_type:
            raise RuntimeError(
                "`type` information is missing from the serialized model dict."
            )
        cls = load_source_path_class(model_type)
        if not issubclass(cls, BaseTypedModel):
            raise RuntimeError(
                f"Class `{cls}` is not a ZenML BaseTypedModel subclass."
            )

        return cls.parse_obj(model_dict)

    @classmethod
    def from_json(
        cls,
        json_str: str,
    ) -> "BaseTypedModel":
        """Instantiate a Pydantic model from a serialized JSON representation.

        Args:
            json_str: the model attributes serialized as JSON.

        Returns:
            A BaseTypedModel created from the serialized representation.
        """
        model_dict = json.loads(json_str)
        return cls.from_dict(model_dict)
from_dict(model_dict) classmethod

Instantiate a Pydantic model from a serialized JSON-able dict representation.

Parameters:

Name Type Description Default
model_dict Dict[str, Any]

the model attributes serialized as JSON-able dict.

required

Returns:

Type Description
BaseTypedModel

A BaseTypedModel created from the serialized representation.

Source code in zenml/utils/typed_model.py
@classmethod
def from_dict(
    cls,
    model_dict: Dict[str, Any],
) -> "BaseTypedModel":
    """Instantiate a Pydantic model from a serialized JSON-able dict
    representation.

    Args:
        model_dict: the model attributes serialized as JSON-able dict.

    Returns:
        A BaseTypedModel created from the serialized representation.
    """
    model_type = model_dict.get("type")
    if not model_type:
        raise RuntimeError(
            "`type` information is missing from the serialized model dict."
        )
    cls = load_source_path_class(model_type)
    if not issubclass(cls, BaseTypedModel):
        raise RuntimeError(
            f"Class `{cls}` is not a ZenML BaseTypedModel subclass."
        )

    return cls.parse_obj(model_dict)
from_json(json_str) classmethod

Instantiate a Pydantic model from a serialized JSON representation.

Parameters:

Name Type Description Default
json_str str

the model attributes serialized as JSON.

required

Returns:

Type Description
BaseTypedModel

A BaseTypedModel created from the serialized representation.

Source code in zenml/utils/typed_model.py
@classmethod
def from_json(
    cls,
    json_str: str,
) -> "BaseTypedModel":
    """Instantiate a Pydantic model from a serialized JSON representation.

    Args:
        json_str: the model attributes serialized as JSON.

    Returns:
        A BaseTypedModel created from the serialized representation.
    """
    model_dict = json.loads(json_str)
    return cls.from_dict(model_dict)

BaseTypedModelMeta (ModelMetaclass)

Metaclass responsible for adding type information to Pydantic models.

Source code in zenml/utils/typed_model.py
class BaseTypedModelMeta(ModelMetaclass):
    """Metaclass responsible for adding type information to Pydantic models."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BaseTypedModelMeta":
        """Creates a pydantic BaseModel class that includes a hidden attribute that
        reflects the full class identifier."""
        if "type" in dct:
            raise TypeError(
                "`type` is a reserved attribute name for BaseTypedModel "
                "subclasses"
            )
        type_name = f"{dct['__module__']}.{dct['__qualname__']}"
        type_ann = Literal[type_name]  # type: ignore [misc,valid-type]
        type = Field(type_name)
        dct.setdefault("__annotations__", dict())["type"] = type_ann
        dct["type"] = type
        cls = cast(
            Type["BaseTypedModel"], super().__new__(mcs, name, bases, dct)
        )
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Creates a pydantic BaseModel class that includes a hidden attribute that reflects the full class identifier.

Source code in zenml/utils/typed_model.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseTypedModelMeta":
    """Creates a pydantic BaseModel class that includes a hidden attribute that
    reflects the full class identifier."""
    if "type" in dct:
        raise TypeError(
            "`type` is a reserved attribute name for BaseTypedModel "
            "subclasses"
        )
    type_name = f"{dct['__module__']}.{dct['__qualname__']}"
    type_ann = Literal[type_name]  # type: ignore [misc,valid-type]
    type = Field(type_name)
    dct.setdefault("__annotations__", dict())["type"] = type_ann
    dct["type"] = type
    cls = cast(
        Type["BaseTypedModel"], super().__new__(mcs, name, bases, dct)
    )
    return cls

yaml_utils

UUIDEncoder (JSONEncoder)

Source code in zenml/utils/yaml_utils.py
class UUIDEncoder(json.JSONEncoder):
    def default(self, obj: Any) -> Any:
        if isinstance(obj, UUID):
            # if the obj is uuid, we simply return the value of uuid
            return obj.hex
        return json.JSONEncoder.default(self, obj)
default(self, obj)

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this::

def default(self, o):
    !!! try
        iterable = iter(o)
    except TypeError:
        pass
    !!! else
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
Source code in zenml/utils/yaml_utils.py
def default(self, obj: Any) -> Any:
    if isinstance(obj, UUID):
        # if the obj is uuid, we simply return the value of uuid
        return obj.hex
    return json.JSONEncoder.default(self, obj)

append_yaml(file_path, contents)

Append contents to a YAML file at file_path.

Source code in zenml/utils/yaml_utils.py
def append_yaml(file_path: str, contents: Dict[Any, Any]) -> None:
    """Append contents to a YAML file at file_path."""
    file_contents = read_yaml(file_path) or {}
    file_contents.update(contents)
    if not utils.is_remote(file_path):
        dir_ = str(Path(file_path).parent)
        if not fileio.isdir(dir_):
            raise FileNotFoundError(f"Directory {dir_} does not exist.")
    utils.write_file_contents_as_string(file_path, yaml.dump(file_contents))

is_yaml(file_path)

Returns True if file_path is YAML, else False

Parameters:

Name Type Description Default
file_path str

Path to YAML file.

required

Returns:

Type Description
bool

True if is yaml, else False.

Source code in zenml/utils/yaml_utils.py
def is_yaml(file_path: str) -> bool:
    """Returns True if file_path is YAML, else False

    Args:
        file_path: Path to YAML file.

    Returns:
        True if is yaml, else False.
    """
    if file_path.endswith("yaml") or file_path.endswith("yml"):
        return True
    return False

read_json(file_path)

Read JSON on file path and returns contents as dict.

Parameters:

Name Type Description Default
file_path str

Path to JSON file.

required
Source code in zenml/utils/yaml_utils.py
def read_json(file_path: str) -> Any:
    """Read JSON on file path and returns contents as dict.

    Args:
        file_path: Path to JSON file.
    """
    if fileio.exists(file_path):
        contents = utils.read_file_contents_as_string(file_path)
        return json.loads(contents)
    else:
        raise FileNotFoundError(f"{file_path} does not exist.")

read_yaml(file_path)

Read YAML on file path and returns contents as dict.

Parameters:

Name Type Description Default
file_path str

Path to YAML file.

required

Returns:

Type Description
Any

Contents of the file in a dict.

Exceptions:

Type Description
FileNotFoundError

if file does not exist.

Source code in zenml/utils/yaml_utils.py
def read_yaml(file_path: str) -> Any:
    """Read YAML on file path and returns contents as dict.

    Args:
        file_path: Path to YAML file.

    Returns:
        Contents of the file in a dict.

    Raises:
        FileNotFoundError: if file does not exist.
    """

    if fileio.exists(file_path):
        contents = utils.read_file_contents_as_string(file_path)
        # TODO: [LOW] consider adding a default empty dict to be returned
        #   instead of None
        return yaml.safe_load(contents)
    else:
        raise FileNotFoundError(f"{file_path} does not exist.")

write_json(file_path, contents)

Write contents as JSON format to file_path.

Parameters:

Name Type Description Default
file_path str

Path to JSON file.

required
contents Dict[str, Any]

Contents of JSON file as dict.

required

Returns:

Type Description
None

Contents of the file in a dict.

Exceptions:

Type Description
FileNotFoundError

if directory does not exist.

Source code in zenml/utils/yaml_utils.py
def write_json(file_path: str, contents: Dict[str, Any]) -> None:
    """Write contents as JSON format to file_path.

    Args:
        file_path: Path to JSON file.
        contents: Contents of JSON file as dict.

    Returns:
        Contents of the file in a dict.

    Raises:
        FileNotFoundError: if directory does not exist.
    """
    if not utils.is_remote(file_path):
        dir_ = str(Path(file_path).parent)
        if not fileio.isdir(dir_):
            # Check if it is a local path, if it doesn't exist, raise Exception.
            raise FileNotFoundError(f"Directory {dir_} does not exist.")
    utils.write_file_contents_as_string(file_path, json.dumps(contents))

write_yaml(file_path, contents)

Write contents as YAML format to file_path.

Parameters:

Name Type Description Default
file_path str

Path to YAML file.

required
contents Dict[Any, Any]

Contents of YAML file as dict.

required

Exceptions:

Type Description
FileNotFoundError

if directory does not exist.

Source code in zenml/utils/yaml_utils.py
def write_yaml(file_path: str, contents: Dict[Any, Any]) -> None:
    """Write contents as YAML format to file_path.

    Args:
        file_path: Path to YAML file.
        contents: Contents of YAML file as dict.

    Raises:
        FileNotFoundError: if directory does not exist.
    """
    if not utils.is_remote(file_path):
        dir_ = str(Path(file_path).parent)
        if not fileio.isdir(dir_):
            raise FileNotFoundError(f"Directory {dir_} does not exist.")
    utils.write_file_contents_as_string(file_path, yaml.dump(contents))