Skip to content

Utils

zenml.utils special

Initialization of the utils module.

The utils module contains utility functions handling analytics, reading and writing YAML data as well as other general purpose functions.

analytics_utils

Analytics code for ZenML.

AnalyticsEvent (str, Enum)

Enum of events to track in segment.

Source code in zenml/utils/analytics_utils.py
class AnalyticsEvent(str, Enum):
    """Enum of events to track in segment."""

    # Pipelines
    RUN_PIPELINE = "Pipeline run"
    GET_PIPELINES = "Pipelines fetched"
    GET_PIPELINE = "Pipeline fetched"

    # Repo
    INITIALIZE_REPO = "ZenML initialized"

    # Profile
    INITIALIZED_PROFILE = "Profile initialized"

    # Components
    REGISTERED_STACK_COMPONENT = "Stack component registered"
    UPDATED_STACK_COMPONENT = "Stack component updated"
    COPIED_STACK_COMPONENT = "Stack component copied"

    # Stack
    REGISTERED_STACK = "Stack registered"
    REGISTERED_DEFAULT_STACK = "Default stack registered"
    SET_STACK = "Stack set"
    UPDATED_STACK = "Stack updated"
    COPIED_STACK = "Stack copied"
    IMPORT_STACK = "Stack imported"
    EXPORT_STACK = "Stack exported"

    # Analytics opt in and out
    OPT_IN_ANALYTICS = "Analytics opt-in"
    OPT_OUT_ANALYTICS = "Analytics opt-out"

    # Examples
    RUN_ZENML_GO = "ZenML go"
    RUN_EXAMPLE = "Example run"
    PULL_EXAMPLE = "Example pull"

    # Integrations
    INSTALL_INTEGRATION = "Integration installed"

    # Users
    CREATED_USER = "User created"
    CREATED_DEFAULT_USER = "Default user created"
    DELETED_USER = "User deleted"

    # Teams
    CREATED_TEAM = "Team created"
    DELETED_TEAM = "Team deleted"

    # Projects
    CREATED_PROJECT = "Project created"
    DELETED_PROJECT = "Project deleted"

    # Role
    CREATED_ROLE = "Role created"
    DELETED_ROLE = "Role deleted"

    # Flavor
    CREATED_FLAVOR = "Flavor created"

    # Test event
    EVENT_TEST = "Test event"

get_segment_key()

Get key for authorizing to Segment backend.

Returns:

Type Description
str

Segment key as a string.

Source code in zenml/utils/analytics_utils.py
def get_segment_key() -> str:
    """Get key for authorizing to Segment backend.

    Returns:
        Segment key as a string.
    """
    if IS_DEBUG_ENV:
        return SEGMENT_KEY_DEV
    else:
        return SEGMENT_KEY_PROD

identify_user(user_metadata=None)

Attach metadata to user directly.

Parameters:

Name Type Description Default
user_metadata Optional[Dict[str, Any]]

Dict of metadata to attach to the user.

None

Returns:

Type Description
bool

True if event is sent successfully, False is not.

Source code in zenml/utils/analytics_utils.py
def identify_user(user_metadata: Optional[Dict[str, Any]] = None) -> bool:
    """Attach metadata to user directly.

    Args:
        user_metadata: Dict of metadata to attach to the user.

    Returns:
        True if event is sent successfully, False is not.
    """
    # TODO [ENG-857]: The identify_user function shares a lot of setup with
    #  track_event() - this duplicated code could be given its own function
    try:
        from zenml.config.global_config import GlobalConfiguration

        gc = GlobalConfiguration()

        # That means user opted out of analytics
        if not gc.analytics_opt_in:
            return False

        import analytics

        if analytics.write_key is None:
            analytics.write_key = get_segment_key()

        assert (
            analytics.write_key is not None
        ), "Analytics key not set but trying to make telemetry call."

        # Set this to 1 to avoid backoff loop
        analytics.max_retries = 1

        logger.debug(
            f"Attempting to attach metadata to: User: {gc.user_id}, "
            f"Metadata: {user_metadata}"
        )

        if user_metadata is None:
            return False

        analytics.identify(str(gc.user_id), traits=user_metadata)
        logger.debug(f"User data sent: User: {gc.user_id},{user_metadata}")
        return True
    except Exception as e:
        # We should never fail main thread
        logger.error(f"User data update failed due to: {e}")
        return False

parametrized(dec)

This is a meta-decorator, that is, a decorator for decorators.

As a decorator is a function, it actually works as a regular decorator with arguments.

Parameters:

Name Type Description Default
dec Callable[..., Callable[..., Any]]

Decorator to be applied to the function.

required

Returns:

Type Description
Callable[..., Callable[[Callable[..., Any]], Callable[..., Any]]]

Decorator that applies the given decorator to the function.

Source code in zenml/utils/analytics_utils.py
def parametrized(
    dec: Callable[..., Callable[..., Any]]
) -> Callable[..., Callable[[Callable[..., Any]], Callable[..., Any]]]:
    """This is a meta-decorator, that is, a decorator for decorators.

    As a decorator is a function, it actually works as a regular decorator
    with arguments.

    Args:
        dec: Decorator to be applied to the function.

    Returns:
        Decorator that applies the given decorator to the function.
    """

    def layer(
        *args: Any, **kwargs: Any
    ) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
        """Internal layer.

        Args:
            *args: Arguments to be passed to the decorator.
            **kwargs: Keyword arguments to be passed to the decorator.

        Returns:
            Decorator that applies the given decorator to the function.
        """

        def repl(f: Callable[..., Any]) -> Callable[..., Any]:
            """Internal REPL.

            Args:
                f: Function to be decorated.

            Returns:
                Decorated function.
            """
            return dec(f, *args, **kwargs)

        return repl

    return layer

track(*args, **kwargs)

Internal layer.

Parameters:

Name Type Description Default
*args Any

Arguments to be passed to the decorator.

()
**kwargs Any

Keyword arguments to be passed to the decorator.

{}

Returns:

Type Description
Callable[[Callable[..., Any]], Callable[..., Any]]

Decorator that applies the given decorator to the function.

Source code in zenml/utils/analytics_utils.py
def layer(
    *args: Any, **kwargs: Any
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
    """Internal layer.

    Args:
        *args: Arguments to be passed to the decorator.
        **kwargs: Keyword arguments to be passed to the decorator.

    Returns:
        Decorator that applies the given decorator to the function.
    """

    def repl(f: Callable[..., Any]) -> Callable[..., Any]:
        """Internal REPL.

        Args:
            f: Function to be decorated.

        Returns:
            Decorated function.
        """
        return dec(f, *args, **kwargs)

    return repl

track_event(event, metadata=None)

Track segment event if user opted-in.

Parameters:

Name Type Description Default
event Union[str, zenml.utils.analytics_utils.AnalyticsEvent]

Name of event to track in segment.

required
metadata Optional[Dict[str, Any]]

Dict of metadata to track.

None

Returns:

Type Description
bool

True if event is sent successfully, False is not.

Source code in zenml/utils/analytics_utils.py
def track_event(
    event: Union[str, AnalyticsEvent],
    metadata: Optional[Dict[str, Any]] = None,
) -> bool:
    """Track segment event if user opted-in.

    Args:
        event: Name of event to track in segment.
        metadata: Dict of metadata to track.

    Returns:
        True if event is sent successfully, False is not.
    """
    try:
        import analytics

        from zenml.config.global_config import GlobalConfiguration

        if analytics.write_key is None:
            analytics.write_key = get_segment_key()

        assert (
            analytics.write_key is not None
        ), "Analytics key not set but trying to make telemetry call."

        # Set this to 1 to avoid backoff loop
        analytics.max_retries = 1

        gc = GlobalConfiguration()
        if isinstance(event, AnalyticsEvent):
            event = event.value

        logger.debug(
            f"Attempting analytics: User: {gc.user_id}, "
            f"Event: {event},"
            f"Metadata: {metadata}"
        )

        if not gc.analytics_opt_in and event not in {
            AnalyticsEvent.OPT_OUT_ANALYTICS,
            AnalyticsEvent.OPT_IN_ANALYTICS,
        }:
            return False

        if metadata is None:
            metadata = {}

        # add basics
        metadata.update(Environment.get_system_info())
        metadata.update(
            {
                "environment": get_environment(),
                "python_version": Environment.python_version(),
                "version": __version__,
            }
        )

        analytics.track(str(gc.user_id), event, metadata)
        logger.debug(
            f"Analytics sent: User: {gc.user_id}, Event: {event}, Metadata: "
            f"{metadata}"
        )
        return True
    except Exception as e:
        # We should never fail main thread
        logger.debug(f"Analytics failed due to: {e}")
        return False

daemon

Utility functions to start/stop daemon processes.

This is only implemented for UNIX systems and therefore doesn't work on Windows. Based on https://www.jejik.com/articles/2007/02/a_simple_unix_linux_daemon_in_python/

check_if_daemon_is_running(pid_file)

Checks whether a daemon process indicated by the PID file is running.

Parameters:

Name Type Description Default
pid_file str

Path to file containing the PID of the daemon process to check.

required

Returns:

Type Description
bool

True if the daemon process is running, otherwise False.

Source code in zenml/utils/daemon.py
def check_if_daemon_is_running(pid_file: str) -> bool:
    """Checks whether a daemon process indicated by the PID file is running.

    Args:
        pid_file: Path to file containing the PID of the daemon
            process to check.

    Returns:
        True if the daemon process is running, otherwise False.
    """
    return get_daemon_pid_if_running(pid_file) is not None

daemonize(pid_file, log_file=None, working_directory='/')

Decorator that executes the decorated function as a daemon process.

Use this decorator to easily transform any function into a daemon process.

Examples:

import time
from zenml.utils.daemonizer import daemonize


@daemonize(log_file='/tmp/daemon.log', pid_file='/tmp/daemon.pid')
def sleeping_daemon(period: int) -> None:
    print(f"I'm a daemon! I will sleep for {period} seconds.")
    time.sleep(period)
    print("Done sleeping, flying away.")

sleeping_daemon(period=30)

print("I'm the daemon's parent!.")
time.sleep(10) # just to prove that the daemon is running in parallel

Parameters:

Name Type Description Default
pid_file str

an optional file where the PID of the daemon process will be stored.

required
log_file Optional[str]

file where stdout and stderr are redirected for the daemon process. If not supplied, the daemon will be silenced (i.e. have its stdout/stderr redirected to /dev/null).

None
working_directory str

working directory for the daemon process, defaults to the root directory.

'/'

Returns:

Type Description
Callable[[~F], ~F]

Decorated function that, when called, will detach from the current process and continue executing in the background, as a daemon process.

Source code in zenml/utils/daemon.py
def daemonize(
    pid_file: str,
    log_file: Optional[str] = None,
    working_directory: str = "/",
) -> Callable[[F], F]:
    """Decorator that executes the decorated function as a daemon process.

    Use this decorator to easily transform any function into a daemon
    process.

    Example:

    ```python
    import time
    from zenml.utils.daemonizer import daemonize


    @daemonize(log_file='/tmp/daemon.log', pid_file='/tmp/daemon.pid')
    def sleeping_daemon(period: int) -> None:
        print(f"I'm a daemon! I will sleep for {period} seconds.")
        time.sleep(period)
        print("Done sleeping, flying away.")

    sleeping_daemon(period=30)

    print("I'm the daemon's parent!.")
    time.sleep(10) # just to prove that the daemon is running in parallel
    ```

    Args:
        pid_file: an optional file where the PID of the daemon process will
            be stored.
        log_file: file where stdout and stderr are redirected for the daemon
            process. If not supplied, the daemon will be silenced (i.e. have
            its stdout/stderr redirected to /dev/null).
        working_directory: working directory for the daemon process,
            defaults to the root directory.

    Returns:
        Decorated function that, when called, will detach from the current
        process and continue executing in the background, as a daemon
        process.
    """

    def inner_decorator(_func: F) -> F:
        def daemon(*args: Any, **kwargs: Any) -> None:
            """Standard daemonization of a process.

            Args:
                *args: Arguments to be passed to the decorated function.
                **kwargs: Keyword arguments to be passed to the decorated
                    function.
            """
            # flake8: noqa: C901
            if sys.platform == "win32":
                logger.error(
                    "Daemon functionality is currently not supported on Windows."
                )
            else:
                run_as_daemon(
                    _func,
                    log_file=log_file,
                    pid_file=pid_file,
                    working_directory=working_directory,
                    *args,
                    **kwargs,
                )

        return cast(F, daemon)

    return inner_decorator

get_daemon_pid_if_running(pid_file)

Read and return the PID value from a PID file.

It does this if the daemon process tracked by the PID file is running.

Parameters:

Name Type Description Default
pid_file str

Path to file containing the PID of the daemon process to check.

required

Returns:

Type Description
Optional[int]

The PID of the daemon process if it is running, otherwise None.

Source code in zenml/utils/daemon.py
def get_daemon_pid_if_running(pid_file: str) -> Optional[int]:
    """Read and return the PID value from a PID file.

    It does this if the daemon process tracked by the PID file is running.

    Args:
        pid_file: Path to file containing the PID of the daemon
            process to check.

    Returns:
        The PID of the daemon process if it is running, otherwise None.
    """
    try:
        with open(pid_file, "r") as f:
            pid = int(f.read().strip())
    except (IOError, FileNotFoundError):
        return None

    if not pid or not psutil.pid_exists(pid):
        return None

    return pid

run_as_daemon(daemon_function, *args, *, pid_file, log_file=None, working_directory='/', **kwargs)

Runs a function as a daemon process.

Parameters:

Name Type Description Default
daemon_function ~F

The function to run as a daemon.

required
pid_file str

Path to file in which to store the PID of the daemon process.

required
log_file Optional[str]

Optional file to which the daemons stdout/stderr will be redirected to.

None
working_directory str

Working directory for the daemon process, defaults to the root directory.

'/'
args Any

Positional arguments to pass to the daemon function.

()
kwargs Any

Keyword arguments to pass to the daemon function.

{}

Exceptions:

Type Description
FileExistsError

If the PID file already exists.

Source code in zenml/utils/daemon.py
def run_as_daemon(
    daemon_function: F,
    *args: Any,
    pid_file: str,
    log_file: Optional[str] = None,
    working_directory: str = "/",
    **kwargs: Any,
) -> None:
    """Runs a function as a daemon process.

    Args:
        daemon_function: The function to run as a daemon.
        pid_file: Path to file in which to store the PID of the daemon
            process.
        log_file: Optional file to which the daemons stdout/stderr will be
            redirected to.
        working_directory: Working directory for the daemon process,
            defaults to the root directory.
        args: Positional arguments to pass to the daemon function.
        kwargs: Keyword arguments to pass to the daemon function.

    Raises:
        FileExistsError: If the PID file already exists.
    """
    # convert to absolute path as we will change working directory later
    if pid_file:
        pid_file = os.path.abspath(pid_file)
    if log_file:
        log_file = os.path.abspath(log_file)

    # create parent directory if necessary
    dir_name = os.path.dirname(pid_file)
    if not os.path.exists(dir_name):
        os.makedirs(dir_name)

    # check if PID file exists
    if pid_file and os.path.exists(pid_file):
        pid = get_daemon_pid_if_running(pid_file)
        if pid:
            raise FileExistsError(
                f"The PID file '{pid_file}' already exists and a daemon "
                f"process with the same PID '{pid}' is already running."
                f"Please remove the PID file or kill the daemon process "
                f"before starting a new daemon."
            )
        logger.warning(
            f"Removing left over PID file '{pid_file}' from a previous "
            f"daemon process that didn't shut down correctly."
        )
        os.remove(pid_file)

    # first fork
    try:
        pid = os.fork()
        if pid > 0:
            # this is the process that called `run_as_daemon` so we
            # simply return so it can keep running
            return
    except OSError as e:
        logger.error("Unable to fork (error code: %d)", e.errno)
        sys.exit(1)

    # decouple from parent environment
    os.chdir(working_directory)
    os.setsid()
    os.umask(0o22)

    # second fork
    try:
        pid = os.fork()
        if pid > 0:
            # this is the parent of the future daemon process, kill it
            # so the daemon gets adopted by the init process
            sys.exit(0)
    except OSError as e:
        sys.stderr.write(f"Unable to fork (error code: {e.errno})")
        sys.exit(1)

    # redirect standard file descriptors to devnull (or the given logfile)
    devnull = "/dev/null"
    if hasattr(os, "devnull"):
        devnull = os.devnull

    devnull_fd = os.open(devnull, os.O_RDWR)
    log_fd = (
        os.open(log_file, os.O_CREAT | os.O_RDWR | os.O_APPEND)
        if log_file
        else None
    )
    out_fd = log_fd or devnull_fd

    os.dup2(devnull_fd, sys.stdin.fileno())
    os.dup2(out_fd, sys.stdout.fileno())
    os.dup2(out_fd, sys.stderr.fileno())

    if pid_file:
        # write the PID file
        with open(pid_file, "w+") as f:
            f.write(f"{os.getpid()}\n")

    # register actions in case this process exits/gets killed
    def cleanup() -> None:
        """Daemon cleanup."""
        terminate_children()
        if pid_file and os.path.exists(pid_file):
            os.remove(pid_file)

    def sighndl(signum: int, frame: Optional[types.FrameType]) -> None:
        """Daemon signal handler.

        Args:
            signum: Signal number.
            frame: Frame object.
        """
        cleanup()

    signal.signal(signal.SIGTERM, sighndl)
    signal.signal(signal.SIGINT, sighndl)
    atexit.register(cleanup)

    # finally run the actual daemon code
    daemon_function(*args, **kwargs)
    sys.exit(0)

stop_daemon(pid_file)

Stops a daemon process.

Parameters:

Name Type Description Default
pid_file str

Path to file containing the PID of the daemon process to kill.

required
Source code in zenml/utils/daemon.py
def stop_daemon(pid_file: str) -> None:
    """Stops a daemon process.

    Args:
        pid_file: Path to file containing the PID of the daemon process to
            kill.
    """
    try:
        with open(pid_file, "r") as f:
            pid = int(f.read().strip())
    except (IOError, FileNotFoundError):
        logger.warning("Daemon PID file '%s' does not exist.", pid_file)
        return

    if psutil.pid_exists(pid):
        process = psutil.Process(pid)
        process.terminate()
    else:
        logger.warning("PID from '%s' does not exist.", pid_file)

terminate_children()

Terminate all processes that are children of the currently running process.

Source code in zenml/utils/daemon.py
def terminate_children() -> None:
    """Terminate all processes that are children of the currently running process."""
    pid = os.getpid()
    try:
        parent = psutil.Process(pid)
    except psutil.Error:
        # could not find parent process id
        return
    children = parent.children(recursive=False)

    for p in children:
        p.terminate()
    _, alive = psutil.wait_procs(
        children, timeout=CHILD_PROCESS_WAIT_TIMEOUT
    )
    for p in alive:
        p.kill()
    _, alive = psutil.wait_procs(
        children, timeout=CHILD_PROCESS_WAIT_TIMEOUT
    )

docker_utils

Utility functions relating to Docker.

build_docker_image(build_context_path, image_name, entrypoint=None, dockerfile_path=None, dockerignore_path=None, requirements=None, environment_vars=None, use_local_requirements=False, base_image=None)

Builds a docker image.

Parameters:

Name Type Description Default
build_context_path str

Path to a directory that will be sent to the docker daemon as build context.

required
image_name str

The name to use for the created docker image.

required
entrypoint Optional[str]

Optional entrypoint command that gets executed when running a container of the built image.

None
dockerfile_path Optional[str]

Optional path to a dockerfile. If no value is given, a temporary dockerfile will be created.

None
dockerignore_path Optional[str]

Optional path to a dockerignore file. If no value is given, the .dockerignore in the root of the build context will be used if it exists. Otherwise, all files inside build_context_path are included in the build context.

None
requirements Optional[AbstractSet[str]]

Optional list of pip requirements to install. This will only be used if no value is given for dockerfile_path.

None
environment_vars Optional[Dict[str, str]]

Optional dict of key value pairs that need to be embedded as environment variables in the image.

None
use_local_requirements bool

If True and no values are given for dockerfile_path and requirements, then the packages installed in the environment of the current python processed will be installed in the docker image.

False
base_image Optional[str]

The image to use as base for the docker image.

None
Source code in zenml/utils/docker_utils.py
def build_docker_image(
    build_context_path: str,
    image_name: str,
    entrypoint: Optional[str] = None,
    dockerfile_path: Optional[str] = None,
    dockerignore_path: Optional[str] = None,
    requirements: Optional[AbstractSet[str]] = None,
    environment_vars: Optional[Dict[str, str]] = None,
    use_local_requirements: bool = False,
    base_image: Optional[str] = None,
) -> None:
    """Builds a docker image.

    Args:
        build_context_path: Path to a directory that will be sent to the
            docker daemon as build context.
        image_name: The name to use for the created docker image.
        entrypoint: Optional entrypoint command that gets executed when running
            a container of the built image.
        dockerfile_path: Optional path to a dockerfile. If no value is given,
            a temporary dockerfile will be created.
        dockerignore_path: Optional path to a dockerignore file. If no value is
            given, the .dockerignore in the root of the build context will be
            used if it exists. Otherwise, all files inside `build_context_path`
            are included in the build context.
        requirements: Optional list of pip requirements to install. This
            will only be used if no value is given for `dockerfile_path`.
        environment_vars: Optional dict of key value pairs that need to be
            embedded as environment variables in the image.
        use_local_requirements: If `True` and no values are given for
            `dockerfile_path` and `requirements`, then the packages installed
            in the environment of the current python processed will be
            installed in the docker image.
        base_image: The image to use as base for the docker image.
    """
    config_path = os.path.join(build_context_path, CONTAINER_ZENML_CONFIG_DIR)
    try:

        # Save a copy of the current global configuration with the
        # active profile and the active stack configuration into the build
        # context, to have the active profile and active stack accessible from
        # within the container.
        GlobalConfiguration().copy_active_configuration(
            config_path,
            load_config_path=f"/app/{CONTAINER_ZENML_CONFIG_DIR}",
        )

        if not requirements and use_local_requirements:
            local_requirements = get_current_environment_requirements()
            requirements = {
                f"{package}=={version}"
                for package, version in local_requirements.items()
                if package != "zenml"  # exclude ZenML
            }
            logger.info(
                "Using requirements from local environment to build "
                "docker image: %s",
                requirements,
            )

        if dockerfile_path:
            dockerfile_contents = read_file_contents_as_string(dockerfile_path)
        else:
            dockerfile_contents = generate_dockerfile_contents(
                base_image=base_image or DEFAULT_BASE_IMAGE,
                entrypoint=entrypoint,
                requirements=requirements,
                environment_vars=environment_vars,
            )

        build_context = create_custom_build_context(
            build_context_path=build_context_path,
            dockerfile_contents=dockerfile_contents,
            dockerignore_path=dockerignore_path,
        )
        # If a custom base image is provided, make sure to always pull the
        # latest version of that image (if it isn't a locally built image).
        # If no base image is provided, we use the static default ZenML image so
        # there is no need to constantly pull
        pull_base_image = False
        if base_image:
            pull_base_image = not is_local_image(base_image)

        logger.info(
            "Building docker image '%s', this might take a while...",
            image_name,
        )

        docker_client = DockerClient.from_env()
        # We use the client api directly here, so we can stream the logs
        output_stream = docker_client.images.client.api.build(
            fileobj=build_context,
            custom_context=True,
            tag=image_name,
            pull=pull_base_image,
            rm=False,  # don't remove intermediate containers
        )
        _process_stream(output_stream)
    finally:
        # Clean up the temporary build files
        fileio.rmtree(config_path)

    logger.info("Finished building docker image.")

create_custom_build_context(build_context_path, dockerfile_contents, dockerignore_path=None)

Creates a docker build context.

Parameters:

Name Type Description Default
build_context_path str

Path to a directory that will be sent to the docker daemon as build context.

required
dockerfile_contents str

File contents of the Dockerfile to use for the build.

required
dockerignore_path Optional[str]

Optional path to a dockerignore file. If no value is given, the .dockerignore in the root of the build context will be used if it exists. Otherwise, all files inside build_context_path are included in the build context.

None

Returns:

Type Description
Any

Docker build context that can be passed when building a docker image.

Source code in zenml/utils/docker_utils.py
def create_custom_build_context(
    build_context_path: str,
    dockerfile_contents: str,
    dockerignore_path: Optional[str] = None,
) -> Any:
    """Creates a docker build context.

    Args:
        build_context_path: Path to a directory that will be sent to the
            docker daemon as build context.
        dockerfile_contents: File contents of the Dockerfile to use for the
            build.
        dockerignore_path: Optional path to a dockerignore file. If no value is
            given, the .dockerignore in the root of the build context will be
            used if it exists. Otherwise, all files inside `build_context_path`
            are included in the build context.

    Returns:
        Docker build context that can be passed when building a docker image.
    """
    exclude_patterns = []
    default_dockerignore_path = os.path.join(
        build_context_path, ".dockerignore"
    )
    if dockerignore_path:
        exclude_patterns = _parse_dockerignore(dockerignore_path)
    elif fileio.exists(default_dockerignore_path):
        logger.info(
            "Using dockerignore found at path '%s' to create docker "
            "build context.",
            default_dockerignore_path,
        )
        exclude_patterns = _parse_dockerignore(default_dockerignore_path)
    else:
        logger.info(
            "No explicit dockerignore specified and no file called "
            ".dockerignore exists at the build context root (%s). "
            "Creating docker build context with all files inside the build "
            "context root directory.",
            build_context_path,
        )

    logger.debug(
        "Exclude patterns for creating docker build context: %s",
        exclude_patterns,
    )
    no_ignores_found = not exclude_patterns

    files = docker_build_utils.exclude_paths(
        build_context_path, patterns=exclude_patterns
    )
    extra_files = [("Dockerfile", dockerfile_contents)]
    context = docker_build_utils.create_archive(
        root=build_context_path,
        files=sorted(files),
        gzip=False,
        extra_files=extra_files,
    )

    build_context_size = os.path.getsize(context.name)
    if build_context_size > 50 * 1024 * 1024 and no_ignores_found:
        # The build context exceeds 50MiB and we didn't find any excludes
        # in dockerignore files -> remind to specify a .dockerignore file
        logger.warning(
            "Build context size for docker image: %s. If you believe this is "
            "unreasonably large, make sure to include a .dockerignore file at "
            "the root of your build context (%s) or specify a custom file "
            "when defining your pipeline.",
            string_utils.get_human_readable_filesize(build_context_size),
            default_dockerignore_path,
        )

    return context

generate_dockerfile_contents(base_image, entrypoint=None, requirements=None, environment_vars=None)

Generates a Dockerfile.

Parameters:

Name Type Description Default
base_image str

The image to use as base for the dockerfile.

required
entrypoint Optional[str]

The default entrypoint command that gets executed when running a container of an image created by this dockerfile.

None
requirements Optional[AbstractSet[str]]

Optional list of pip requirements to install.

None
environment_vars Optional[Dict[str, str]]

Optional dict of environment variables to set.

None

Returns:

Type Description
str

Content of a dockerfile.

Source code in zenml/utils/docker_utils.py
def generate_dockerfile_contents(
    base_image: str,
    entrypoint: Optional[str] = None,
    requirements: Optional[AbstractSet[str]] = None,
    environment_vars: Optional[Dict[str, str]] = None,
) -> str:
    """Generates a Dockerfile.

    Args:
        base_image: The image to use as base for the dockerfile.
        entrypoint: The default entrypoint command that gets executed when
            running a container of an image created by this dockerfile.
        requirements: Optional list of pip requirements to install.
        environment_vars: Optional dict of environment variables to set.

    Returns:
        Content of a dockerfile.
    """
    lines = [f"FROM {base_image}", "WORKDIR /app"]

    # TODO [ENG-781]: Make secrets invisible in the Dockerfile or use a different approach.
    if environment_vars:
        for key, value in environment_vars.items():
            lines.append(f"ENV {key.upper()}={value}")

    if requirements:
        # Quote the requirements to avoid problems with special characters
        requirements = {f"'{r}'" for r in requirements}
        lines.append(
            f"RUN pip install --no-cache {' '.join(sorted(requirements))}"
        )

    lines.append("COPY . .")
    lines.append("RUN chmod -R a+rw .")
    lines.append(
        f"ENV {ENV_ZENML_CONFIG_PATH}=/app/{CONTAINER_ZENML_CONFIG_DIR}"
    )

    if entrypoint:
        lines.append(f"ENTRYPOINT {entrypoint}")

    return "\n".join(lines)

get_current_environment_requirements()

Returns a dict of package requirements.

This applies to the environment that in which the current python process is running.

Returns:

Type Description
Dict[str, str]

A dict of package requirements.

Source code in zenml/utils/docker_utils.py
def get_current_environment_requirements() -> Dict[str, str]:
    """Returns a dict of package requirements.

    This applies to the environment that in which the current python process is
    running.

    Returns:
        A dict of package requirements.
    """
    return {
        distribution.key: distribution.version
        for distribution in pkg_resources.working_set
    }

get_image_digest(image_name)

Gets the digest of a docker image.

Parameters:

Name Type Description Default
image_name str

Name of the image to get the digest for.

required

Returns:

Type Description
Optional[str]

Returns the repo digest for the given image if there exists exactly one. If there are zero or multiple repo digests, returns None.

Source code in zenml/utils/docker_utils.py
def get_image_digest(image_name: str) -> Optional[str]:
    """Gets the digest of a docker image.

    Args:
        image_name: Name of the image to get the digest for.

    Returns:
        Returns the repo digest for the given image if there exists exactly one.
        If there are zero or multiple repo digests, returns `None`.
    """
    docker_client = DockerClient.from_env()
    image = docker_client.images.get(image_name)
    repo_digests = image.attrs["RepoDigests"]
    if len(repo_digests) == 1:
        return cast(str, repo_digests[0])
    else:
        logger.debug(
            "Found zero or more repo digests for docker image '%s': %s",
            image_name,
            repo_digests,
        )
        return None

is_local_image(image_name)

Returns whether an image was pulled from a registry or not.

Parameters:

Name Type Description Default
image_name str

Name of the image to check.

required

Returns:

Type Description
bool

True if the image was pulled from a registry, False otherwise.

Source code in zenml/utils/docker_utils.py
def is_local_image(image_name: str) -> bool:
    """Returns whether an image was pulled from a registry or not.

    Args:
        image_name: Name of the image to check.

    Returns:
        `True` if the image was pulled from a registry, `False` otherwise.
    """
    docker_client = DockerClient.from_env()
    images = docker_client.images.list(name=image_name)
    if images:
        # An image with this name is available locally -> now check whether it
        # was pulled from a repo or built locally (in which case the repo
        # digest is empty)
        return get_image_digest(image_name) is None
    else:
        # no image with this name found locally
        return False

push_docker_image(image_name)

Pushes a docker image to a container registry.

Parameters:

Name Type Description Default
image_name str

The full name (including a tag) of the image to push.

required
Source code in zenml/utils/docker_utils.py
def push_docker_image(image_name: str) -> None:
    """Pushes a docker image to a container registry.

    Args:
        image_name: The full name (including a tag) of the image to push.
    """
    logger.info("Pushing docker image '%s'.", image_name)
    docker_client = DockerClient.from_env()
    output_stream = docker_client.images.push(image_name, stream=True)
    _process_stream(output_stream)
    logger.info("Finished pushing docker image.")

enum_utils

Util functions for enums.

StrEnum (str, Enum)

Base enum type for string enum values.

Source code in zenml/utils/enum_utils.py
class StrEnum(str, Enum):
    """Base enum type for string enum values."""

    def __str__(self) -> str:
        """Returns the enum string value.

        Returns:
            The enum string value.
        """
        return self.value  # type: ignore

    @classmethod
    def names(cls) -> List[str]:
        """Get all enum names as a list of strings.

        Returns:
            A list of all enum names.
        """
        return [c.name for c in cls]

    @classmethod
    def values(cls) -> List[str]:
        """Get all enum values as a list of strings.

        Returns:
            A list of all enum values.
        """
        return [c.value for c in cls]

filesync_model

Filesync utils for ZenML.

FileSyncModel (BaseModel) pydantic-model

Pydantic model synchronized with a configuration file.

Use this class as a base Pydantic model that is automatically synchronized with a configuration file on disk.

This class overrides the setattr and getattr magic methods to ensure that the FileSyncModel instance acts as an in-memory cache of the information stored in the associated configuration file.

Source code in zenml/utils/filesync_model.py
class FileSyncModel(BaseModel):
    """Pydantic model synchronized with a configuration file.

    Use this class as a base Pydantic model that is automatically synchronized
    with a configuration file on disk.

    This class overrides the __setattr__ and __getattr__ magic methods to
    ensure that the FileSyncModel instance acts as an in-memory cache of the
    information stored in the associated configuration file.
    """

    _config_file: str
    _config_file_timestamp: Optional[float]

    def __init__(self, config_file: str, **kwargs: Any) -> None:
        """Create a FileSyncModel instance synchronized with a configuration file on disk.

        Args:
            config_file: configuration file path. If the file exists, the model
                will be initialized with the values from the file.
            **kwargs: additional keyword arguments to pass to the Pydantic model
                constructor. If supplied, these values will override those
                loaded from the configuration file.
        """
        config_dict = {}
        if fileio.exists(config_file):
            config_dict = yaml_utils.read_yaml(config_file)

        self._config_file = config_file
        self._config_file_timestamp = None

        config_dict.update(kwargs)
        super(FileSyncModel, self).__init__(**config_dict)

        # write the configuration file to disk, to reflect new attributes
        # and schema changes
        self.write_config()

    def __setattr__(self, key: str, value: Any) -> None:
        """Sets an attribute on the model and persists it in the configuration file.

        Args:
            key: attribute name.
            value: attribute value.
        """
        super(FileSyncModel, self).__setattr__(key, value)
        if key.startswith("_"):
            return
        self.write_config()

    def __getattribute__(self, key: str) -> Any:
        """Gets an attribute value for a specific key.

        Args:
            key: attribute name.

        Returns:
            attribute value.
        """
        if not key.startswith("_") and key in self.__dict__:
            self.load_config()
        return super(FileSyncModel, self).__getattribute__(key)

    def write_config(self) -> None:
        """Writes the model to the configuration file."""
        config_dict = json.loads(self.json())
        yaml_utils.write_yaml(self._config_file, config_dict)
        self._config_file_timestamp = os.path.getmtime(self._config_file)

    def load_config(self) -> None:
        """Loads the model from the configuration file on disk."""
        if not fileio.exists(self._config_file):
            return

        # don't reload the configuration if the file hasn't
        # been updated since the last load
        file_timestamp = os.path.getmtime(self._config_file)
        if file_timestamp == self._config_file_timestamp:
            return

        if self._config_file_timestamp is not None:
            logger.info(f"Reloading configuration file {self._config_file}")

        # refresh the model from the configuration file values
        config_dict = yaml_utils.read_yaml(self._config_file)
        for key, value in config_dict.items():
            super(FileSyncModel, self).__setattr__(key, value)

        self._config_file_timestamp = file_timestamp

    class Config:
        """Pydantic configuration class."""

        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
Config

Pydantic configuration class.

Source code in zenml/utils/filesync_model.py
class Config:
    """Pydantic configuration class."""

    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
__getattribute__(self, key) special

Gets an attribute value for a specific key.

Parameters:

Name Type Description Default
key str

attribute name.

required

Returns:

Type Description
Any

attribute value.

Source code in zenml/utils/filesync_model.py
def __getattribute__(self, key: str) -> Any:
    """Gets an attribute value for a specific key.

    Args:
        key: attribute name.

    Returns:
        attribute value.
    """
    if not key.startswith("_") and key in self.__dict__:
        self.load_config()
    return super(FileSyncModel, self).__getattribute__(key)
__init__(self, config_file, **kwargs) special

Create a FileSyncModel instance synchronized with a configuration file on disk.

Parameters:

Name Type Description Default
config_file str

configuration file path. If the file exists, the model will be initialized with the values from the file.

required
**kwargs Any

additional keyword arguments to pass to the Pydantic model constructor. If supplied, these values will override those loaded from the configuration file.

{}
Source code in zenml/utils/filesync_model.py
def __init__(self, config_file: str, **kwargs: Any) -> None:
    """Create a FileSyncModel instance synchronized with a configuration file on disk.

    Args:
        config_file: configuration file path. If the file exists, the model
            will be initialized with the values from the file.
        **kwargs: additional keyword arguments to pass to the Pydantic model
            constructor. If supplied, these values will override those
            loaded from the configuration file.
    """
    config_dict = {}
    if fileio.exists(config_file):
        config_dict = yaml_utils.read_yaml(config_file)

    self._config_file = config_file
    self._config_file_timestamp = None

    config_dict.update(kwargs)
    super(FileSyncModel, self).__init__(**config_dict)

    # write the configuration file to disk, to reflect new attributes
    # and schema changes
    self.write_config()
__setattr__(self, key, value) special

Sets an attribute on the model and persists it in the configuration file.

Parameters:

Name Type Description Default
key str

attribute name.

required
value Any

attribute value.

required
Source code in zenml/utils/filesync_model.py
def __setattr__(self, key: str, value: Any) -> None:
    """Sets an attribute on the model and persists it in the configuration file.

    Args:
        key: attribute name.
        value: attribute value.
    """
    super(FileSyncModel, self).__setattr__(key, value)
    if key.startswith("_"):
        return
    self.write_config()
load_config(self)

Loads the model from the configuration file on disk.

Source code in zenml/utils/filesync_model.py
def load_config(self) -> None:
    """Loads the model from the configuration file on disk."""
    if not fileio.exists(self._config_file):
        return

    # don't reload the configuration if the file hasn't
    # been updated since the last load
    file_timestamp = os.path.getmtime(self._config_file)
    if file_timestamp == self._config_file_timestamp:
        return

    if self._config_file_timestamp is not None:
        logger.info(f"Reloading configuration file {self._config_file}")

    # refresh the model from the configuration file values
    config_dict = yaml_utils.read_yaml(self._config_file)
    for key, value in config_dict.items():
        super(FileSyncModel, self).__setattr__(key, value)

    self._config_file_timestamp = file_timestamp
write_config(self)

Writes the model to the configuration file.

Source code in zenml/utils/filesync_model.py
def write_config(self) -> None:
    """Writes the model to the configuration file."""
    config_dict = json.loads(self.json())
    yaml_utils.write_yaml(self._config_file, config_dict)
    self._config_file_timestamp = os.path.getmtime(self._config_file)

io_utils

Various utility functions for the io module.

convert_to_str(path)

Converts a PathType to a str using UTF-8.

Parameters:

Name Type Description Default
path Union[bytes, str]

Path to convert.

required

Returns:

Type Description
str

Converted path.

Source code in zenml/utils/io_utils.py
def convert_to_str(path: PathType) -> str:
    """Converts a PathType to a str using UTF-8.

    Args:
        path: Path to convert.

    Returns:
        Converted path.
    """
    if isinstance(path, str):
        return path
    else:
        return path.decode("utf-8")

copy_dir(source_dir, destination_dir, overwrite=False)

Copies dir from source to destination.

Parameters:

Name Type Description Default
source_dir str

Path to copy from.

required
destination_dir str

Path to copy to.

required
overwrite bool

Boolean. If false, function throws an error before overwrite.

False
Source code in zenml/utils/io_utils.py
def copy_dir(
    source_dir: str, destination_dir: str, overwrite: bool = False
) -> None:
    """Copies dir from source to destination.

    Args:
        source_dir: Path to copy from.
        destination_dir: Path to copy to.
        overwrite: Boolean. If false, function throws an error before overwrite.
    """
    for source_file in listdir(source_dir):
        source_path = os.path.join(source_dir, convert_to_str(source_file))
        destination_path = os.path.join(
            destination_dir, convert_to_str(source_file)
        )
        if isdir(source_path):
            if source_path == destination_dir:
                # if the destination is a subdirectory of the source, we skip
                # copying it to avoid an infinite loop.
                return
            copy_dir(source_path, destination_path, overwrite)
        else:
            create_dir_recursive_if_not_exists(
                str(Path(destination_path).parent)
            )
            copy(str(source_path), str(destination_path), overwrite)

create_dir_if_not_exists(dir_path)

Creates directory if it does not exist.

Parameters:

Name Type Description Default
dir_path str

Local path in filesystem.

required
Source code in zenml/utils/io_utils.py
def create_dir_if_not_exists(dir_path: str) -> None:
    """Creates directory if it does not exist.

    Args:
        dir_path: Local path in filesystem.
    """
    if not isdir(dir_path):
        mkdir(dir_path)

create_dir_recursive_if_not_exists(dir_path)

Creates directory recursively if it does not exist.

Parameters:

Name Type Description Default
dir_path str

Local path in filesystem.

required
Source code in zenml/utils/io_utils.py
def create_dir_recursive_if_not_exists(dir_path: str) -> None:
    """Creates directory recursively if it does not exist.

    Args:
        dir_path: Local path in filesystem.
    """
    if not isdir(dir_path):
        makedirs(dir_path)

create_file_if_not_exists(file_path, file_contents='{}')

Creates file if it does not exist.

Parameters:

Name Type Description Default
file_path str

Local path in filesystem.

required
file_contents str

Contents of file.

'{}'
Source code in zenml/utils/io_utils.py
def create_file_if_not_exists(
    file_path: str, file_contents: str = "{}"
) -> None:
    """Creates file if it does not exist.

    Args:
        file_path: Local path in filesystem.
        file_contents: Contents of file.
    """
    full_path = Path(file_path)
    if not exists(file_path):
        create_dir_recursive_if_not_exists(str(full_path.parent))
        with open(str(full_path), "w") as f:
            f.write(file_contents)

find_files(dir_path, pattern)

Find files in a directory that match pattern.

Parameters:

Name Type Description Default
dir_path Union[bytes, str]

Path to directory.

required
pattern str

pattern like *.png.

required

Yields:

Type Description
Iterable[str]

All matching filenames if found.

Source code in zenml/utils/io_utils.py
def find_files(dir_path: PathType, pattern: str) -> Iterable[str]:
    """Find files in a directory that match pattern.

    Args:
        dir_path: Path to directory.
        pattern: pattern like *.png.

    Yields:
        All matching filenames if found.
    """
    for root, dirs, files in walk(dir_path):
        for basename in files:
            if fnmatch.fnmatch(convert_to_str(basename), pattern):
                filename = os.path.join(
                    convert_to_str(root), convert_to_str(basename)
                )
                yield filename

get_global_config_directory()

Gets the global config directory for ZenML.

Returns:

Type Description
str

The global config directory for ZenML.

Source code in zenml/utils/io_utils.py
def get_global_config_directory() -> str:
    """Gets the global config directory for ZenML.

    Returns:
        The global config directory for ZenML.
    """
    env_var_path = os.getenv(ENV_ZENML_CONFIG_PATH)
    if env_var_path:
        return str(Path(env_var_path).resolve())
    return click.get_app_dir(APP_NAME)

get_grandparent(dir_path)

Get grandparent of dir.

Parameters:

Name Type Description Default
dir_path str

Path to directory.

required

Returns:

Type Description
str

The input path's parent's parent.

Source code in zenml/utils/io_utils.py
def get_grandparent(dir_path: str) -> str:
    """Get grandparent of dir.

    Args:
        dir_path: Path to directory.

    Returns:
        The input path's parent's parent.
    """
    return Path(dir_path).parent.parent.stem

get_parent(dir_path)

Get parent of dir.

Parameters:

Name Type Description Default
dir_path str

Path to directory.

required

Returns:

Type Description
str

Parent (stem) of the dir as a string.

Source code in zenml/utils/io_utils.py
def get_parent(dir_path: str) -> str:
    """Get parent of dir.

    Args:
        dir_path: Path to directory.

    Returns:
        Parent (stem) of the dir as a string.
    """
    return Path(dir_path).parent.stem

is_remote(path)

Returns True if path exists remotely.

Parameters:

Name Type Description Default
path str

Any path as a string.

required

Returns:

Type Description
bool

True if remote path, else False.

Source code in zenml/utils/io_utils.py
def is_remote(path: str) -> bool:
    """Returns True if path exists remotely.

    Args:
        path: Any path as a string.

    Returns:
        True if remote path, else False.
    """
    return any(path.startswith(prefix) for prefix in REMOTE_FS_PREFIX)

is_root(path)

Returns true if path has no parent in local filesystem.

Parameters:

Name Type Description Default
path str

Local path in filesystem.

required

Returns:

Type Description
bool

True if root, else False.

Source code in zenml/utils/io_utils.py
def is_root(path: str) -> bool:
    """Returns true if path has no parent in local filesystem.

    Args:
        path: Local path in filesystem.

    Returns:
        True if root, else False.
    """
    return Path(path).parent == Path(path)

read_file_contents_as_string(file_path)

Reads contents of file.

Parameters:

Name Type Description Default
file_path str

Path to file.

required

Returns:

Type Description
str

Contents of file.

Exceptions:

Type Description
FileNotFoundError

If file does not exist.

Source code in zenml/utils/io_utils.py
def read_file_contents_as_string(file_path: str) -> str:
    """Reads contents of file.

    Args:
        file_path: Path to file.

    Returns:
        Contents of file.

    Raises:
        FileNotFoundError: If file does not exist.
    """
    if not exists(file_path):
        raise FileNotFoundError(f"{file_path} does not exist!")
    return open(file_path).read()  # type: ignore[no-any-return]

resolve_relative_path(path)

Takes relative path and resolves it absolutely.

Parameters:

Name Type Description Default
path str

Local path in filesystem.

required

Returns:

Type Description
str

Resolved path.

Source code in zenml/utils/io_utils.py
def resolve_relative_path(path: str) -> str:
    """Takes relative path and resolves it absolutely.

    Args:
        path: Local path in filesystem.

    Returns:
        Resolved path.
    """
    if is_remote(path):
        return path
    return str(Path(path).resolve())

write_file_contents_as_string(file_path, content)

Writes contents of file.

Parameters:

Name Type Description Default
file_path str

Path to file.

required
content str

Contents of file.

required
Source code in zenml/utils/io_utils.py
def write_file_contents_as_string(file_path: str, content: str) -> None:
    """Writes contents of file.

    Args:
        file_path: Path to file.
        content: Contents of file.
    """
    with open(file_path, "w") as f:
        f.write(content)

networking_utils

Utility functions for networking.

find_available_port()

Finds a local random unoccupied TCP port.

Returns:

Type Description
int

A random unoccupied TCP port.

Source code in zenml/utils/networking_utils.py
def find_available_port() -> int:
    """Finds a local random unoccupied TCP port.

    Returns:
        A random unoccupied TCP port.
    """
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(("127.0.0.1", 0))
        _, port = s.getsockname()

    return cast(int, port)

port_available(port)

Checks if a local port is available.

Parameters:

Name Type Description Default
port int

TCP port number

required

Returns:

Type Description
bool

True if the port is available, otherwise False

Source code in zenml/utils/networking_utils.py
def port_available(port: int) -> bool:
    """Checks if a local port is available.

    Args:
        port: TCP port number

    Returns:
        True if the port is available, otherwise False
    """
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.bind(("127.0.0.1", port))
    except socket.error as e:
        logger.debug("Port %d unavailable: %s", port, e)
        return False

    return True

port_is_open(hostname, port)

Check if a TCP port is open on a remote host.

Parameters:

Name Type Description Default
hostname str

hostname of the remote machine

required
port int

TCP port number

required

Returns:

Type Description
bool

True if the port is open, False otherwise

Source code in zenml/utils/networking_utils.py
def port_is_open(hostname: str, port: int) -> bool:
    """Check if a TCP port is open on a remote host.

    Args:
        hostname: hostname of the remote machine
        port: TCP port number

    Returns:
        True if the port is open, False otherwise
    """
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            result = sock.connect_ex((hostname, port))
            return result == 0
    except socket.error as e:
        logger.debug(
            f"Error checking TCP port {port} on host {hostname}: {str(e)}"
        )
        return False

scan_for_available_port(start=8000, stop=65535)

Scan the local network for an available port in the given range.

Parameters:

Name Type Description Default
start int

the beginning of the port range value to scan

8000
stop int

the (inclusive) end of the port range value to scan

65535

Returns:

Type Description
Optional[int]

The first available port in the given range, or None if no available port is found.

Source code in zenml/utils/networking_utils.py
def scan_for_available_port(
    start: int = SCAN_PORT_RANGE[0], stop: int = SCAN_PORT_RANGE[1]
) -> Optional[int]:
    """Scan the local network for an available port in the given range.

    Args:
        start: the beginning of the port range value to scan
        stop: the (inclusive) end of the port range value to scan

    Returns:
        The first available port in the given range, or None if no available
        port is found.
    """
    for port in range(start, stop + 1):
        if port_available(port):
            return port
    logger.debug(
        "No free TCP ports found in the range %d - %d",
        start,
        stop,
    )
    return None

singleton

Utility class to turn classes into singleton classes.

SingletonMetaClass (type)

Singleton metaclass.

Use this metaclass to make any class into a singleton class:

class OneRing(metaclass=SingletonMetaClass):
    def __init__(self, owner):
        self._owner = owner

    @property
    def owner(self):
        return self._owner

the_one_ring = OneRing('Sauron')
the_lost_ring = OneRing('Frodo')
print(the_lost_ring.owner)  # Sauron
OneRing._clear() # ring destroyed
Source code in zenml/utils/singleton.py
class SingletonMetaClass(type):
    """Singleton metaclass.

    Use this metaclass to make any class into a singleton class:

    ```python
    class OneRing(metaclass=SingletonMetaClass):
        def __init__(self, owner):
            self._owner = owner

        @property
        def owner(self):
            return self._owner

    the_one_ring = OneRing('Sauron')
    the_lost_ring = OneRing('Frodo')
    print(the_lost_ring.owner)  # Sauron
    OneRing._clear() # ring destroyed
    ```
    """

    def __init__(cls, *args: Any, **kwargs: Any) -> None:
        """Initialize a singleton class.

        Args:
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.
        """
        super().__init__(*args, **kwargs)
        cls.__singleton_instance: Optional["SingletonMetaClass"] = None

    def __call__(cls, *args: Any, **kwargs: Any) -> "SingletonMetaClass":
        """Create or return the singleton instance.

        Args:
            *args: Additional arguments.
            **kwargs: Additional keyword arguments.

        Returns:
            The singleton instance.
        """
        if not cls.__singleton_instance:
            cls.__singleton_instance = cast(
                "SingletonMetaClass", super().__call__(*args, **kwargs)
            )

        return cls.__singleton_instance

    def _clear(cls) -> None:
        """Clear the singleton instance."""
        cls.__singleton_instance = None
__call__(cls, *args, **kwargs) special

Create or return the singleton instance.

Parameters:

Name Type Description Default
*args Any

Additional arguments.

()
**kwargs Any

Additional keyword arguments.

{}

Returns:

Type Description
SingletonMetaClass

The singleton instance.

Source code in zenml/utils/singleton.py
def __call__(cls, *args: Any, **kwargs: Any) -> "SingletonMetaClass":
    """Create or return the singleton instance.

    Args:
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        The singleton instance.
    """
    if not cls.__singleton_instance:
        cls.__singleton_instance = cast(
            "SingletonMetaClass", super().__call__(*args, **kwargs)
        )

    return cls.__singleton_instance
__init__(cls, *args, **kwargs) special

Initialize a singleton class.

Parameters:

Name Type Description Default
*args Any

Additional arguments.

()
**kwargs Any

Additional keyword arguments.

{}
Source code in zenml/utils/singleton.py
def __init__(cls, *args: Any, **kwargs: Any) -> None:
    """Initialize a singleton class.

    Args:
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.
    """
    super().__init__(*args, **kwargs)
    cls.__singleton_instance: Optional["SingletonMetaClass"] = None

source_utils

Utility functions for source code.

These utils are predicated on the following definitions:

  • class_source: This is a python-import type path to a class, e.g. some.mod.class
  • module_source: This is a python-import type path to a module, e.g. some.mod
  • file_path, relative_path, absolute_path: These are file system paths.
  • source: This is a class_source or module_source. If it is a class_source, it can also be optionally pinned.
  • pin: Whatever comes after the @ symbol from a source, usually the git sha or the version of zenml as a string.

create_zenml_pin()

Creates a ZenML pin for source pinning from release version.

Returns:

Type Description
str

ZenML pin.

Source code in zenml/utils/source_utils.py
def create_zenml_pin() -> str:
    """Creates a ZenML pin for source pinning from release version.

    Returns:
        ZenML pin.
    """
    return f"{APP_NAME}_{__version__}"

get_absolute_path_from_module_source(module)

Get a directory path from module source.

E.g. zenml.core.step will return full/path/to/zenml/core/step.

Parameters:

Name Type Description Default
module str

A module e.g. zenml.core.step.

required

Returns:

Type Description
str

An absolute path e.g. full/path/to/zenml/core/step.

Source code in zenml/utils/source_utils.py
def get_absolute_path_from_module_source(module: str) -> str:
    """Get a directory path from module source.

    E.g. `zenml.core.step` will return `full/path/to/zenml/core/step`.

    Args:
        module: A module e.g. `zenml.core.step`.

    Returns:
        An absolute path e.g. `full/path/to/zenml/core/step`.
    """
    mod = importlib.import_module(module)
    return mod.__path__[0]

get_class_source_from_source(source)

Gets class source from source, i.e. module.path@version, returns version.

Parameters:

Name Type Description Default
source str

source pointing to potentially pinned sha.

required

Returns:

Type Description
str

class_source e.g. this.module.Class.

Source code in zenml/utils/source_utils.py
def get_class_source_from_source(source: str) -> str:
    """Gets class source from source, i.e. module.path@version, returns version.

    Args:
        source: source pointing to potentially pinned sha.

    Returns:
        class_source e.g. this.module.Class.
    """
    # source need not even be pinned
    return source.split("@")[0]

get_hashed_source(value)

Returns a hash of the objects source code.

Parameters:

Name Type Description Default
value Any

object to get source from.

required

Returns:

Type Description
str

Hash of source code.

Exceptions:

Type Description
TypeError

If unable to compute the hash.

Source code in zenml/utils/source_utils.py
def get_hashed_source(value: Any) -> str:
    """Returns a hash of the objects source code.

    Args:
        value: object to get source from.

    Returns:
        Hash of source code.

    Raises:
        TypeError: If unable to compute the hash.
    """
    try:
        source_code = get_source(value)
    except TypeError:
        raise TypeError(
            f"Unable to compute the hash of source code of object: {value}."
        )
    return hashlib.sha256(source_code.encode("utf-8")).hexdigest()

get_module_source_from_class(class_)

Takes class input and returns module_source.

If class is already string then returns the same.

Parameters:

Name Type Description Default
class_ Union[Type[Any], str]

object of type class.

required

Returns:

Type Description
Optional[str]

module_source of class.

Exceptions:

Type Description
AssertionError

if step_type is neither a class nor a string.

Source code in zenml/utils/source_utils.py
def get_module_source_from_class(
    class_: Union[Type[Any], str]
) -> Optional[str]:
    """Takes class input and returns module_source.

    If class is already string then returns the same.

    Args:
        class_: object of type class.

    Returns:
        module_source of class.

    Raises:
        AssertionError: if step_type is neither a class nor a string.
    """
    if isinstance(class_, str):
        module_source = class_
    else:
        # Infer it from the class provided
        if not inspect.isclass(class_):
            raise AssertionError("step_type is neither string nor class.")
        module_source = class_.__module__ + "." + class_.__name__
    return module_source

get_module_source_from_module(module)

Gets the source of the supplied module.

E.g.:

  • a /home/myrepo/src/run.py module running as the main module returns run if no repository root is specified.

  • a /home/myrepo/src/run.py module running as the main module returns src.run if the repository root is configured in /home/myrepo

  • a /home/myrepo/src/pipeline.py module not running as the main module returns src.pipeline if the repository root is configured in /home/myrepo

  • a /home/myrepo/src/pipeline.py module not running as the main module returns pipeline if no repository root is specified and the main module is also in /home/myrepo/src.

  • a /home/step.py module not running as the main module returns step if the CWD is /home and the repository root or the main module are in a different path (e.g. /home/myrepo/src).

Parameters:

Name Type Description Default
module module

the module to get the source of.

required

Returns:

Type Description
str

The source of the main module.

Exceptions:

Type Description
RuntimeError

if the module is not loaded from a file

Source code in zenml/utils/source_utils.py
def get_module_source_from_module(module: ModuleType) -> str:
    """Gets the source of the supplied module.

    E.g.:

      * a `/home/myrepo/src/run.py` module running as the main module returns
      `run` if no repository root is specified.

      * a `/home/myrepo/src/run.py` module running as the main module returns
      `src.run` if the repository root is configured in `/home/myrepo`

      * a `/home/myrepo/src/pipeline.py` module not running as the main module
      returns `src.pipeline` if the repository root is configured in
      `/home/myrepo`

      * a `/home/myrepo/src/pipeline.py` module not running as the main module
      returns `pipeline` if no repository root is specified and the main
      module is also in `/home/myrepo/src`.

      * a `/home/step.py` module not running as the main module
      returns `step` if the CWD is /home and the repository root or the main
      module are in a different path (e.g. `/home/myrepo/src`).

    Args:
        module: the module to get the source of.

    Returns:
        The source of the main module.

    Raises:
        RuntimeError: if the module is not loaded from a file
    """
    if not hasattr(module, "__file__") or not module.__file__:
        if module.__name__ == "__main__":
            raise RuntimeError(
                f"{module} module was not loaded from a file. Cannot "
                "determine the module root path."
            )
        return module.__name__
    module_path = os.path.abspath(module.__file__)

    root_path = get_source_root_path()

    if not module_path.startswith(root_path):
        root_path = os.getcwd()
        logger.warning(
            "User module %s is not in the source root. Using current "
            "directory %s instead to resolve module source.",
            module,
            root_path,
        )

    # Remove root_path from module_path to get relative path left over
    module_path = module_path.replace(root_path, "")[1:]

    # Kick out the .py and replace `/` with `.` to get the module source
    module_path = module_path.replace(".py", "")
    module_source = module_path.replace(os.path.sep, ".")

    logger.debug(
        f"Resolved module source for module {module} to: {module_source}"
    )

    return module_source

get_module_source_from_source(source)

Gets module source from source.

For example some.module.file.class@version would return some.module.

Parameters:

Name Type Description Default
source str

source pointing to potentially pinned sha.

required

Returns:

Type Description
str

module_source e.g. some.module.

Source code in zenml/utils/source_utils.py
def get_module_source_from_source(source: str) -> str:
    """Gets module source from source.

    For example `some.module.file.class@version` would
    return `some.module`.

    Args:
        source: source pointing to potentially pinned sha.

    Returns:
        module_source e.g. some.module.
    """
    class_source = get_class_source_from_source(source)
    return ".".join(class_source.split(".")[:-2])

get_relative_path_from_module_source(module_source)

Get a directory path from module, relative to root of the package tree.

E.g. zenml.core.step will return zenml/core/step.

Parameters:

Name Type Description Default
module_source str

A module e.g. zenml.core.step

required

Returns:

Type Description
str

A relative path e.g. zenml/core/step.

Source code in zenml/utils/source_utils.py
def get_relative_path_from_module_source(module_source: str) -> str:
    """Get a directory path from module, relative to root of the package tree.

    E.g. zenml.core.step will return zenml/core/step.

    Args:
        module_source: A module e.g. zenml.core.step

    Returns:
        A relative path e.g. zenml/core/step.
    """
    return module_source.replace(".", os.path.sep)

get_source(value)

Returns the source code of an object.

If executing within a IPython kernel environment, then this monkey-patches inspect module temporarily with a workaround to get source from the cell.

Parameters:

Name Type Description Default
value Any

object to get source from.

required

Returns:

Type Description
str

Source code of object.

Source code in zenml/utils/source_utils.py
def get_source(value: Any) -> str:
    """Returns the source code of an object.

    If executing within a IPython kernel environment, then this monkey-patches
    `inspect` module temporarily with a workaround to get source from the cell.

    Args:
        value: object to get source from.

    Returns:
        Source code of object.
    """
    if Environment.in_notebook():
        # Monkey patch inspect.getfile temporarily to make getsource work.
        # Source: https://stackoverflow.com/questions/51566497/
        def _new_getfile(
            object: Any,
            _old_getfile: Callable[
                [
                    Union[
                        ModuleType,
                        Type[Any],
                        MethodType,
                        FunctionType,
                        TracebackType,
                        FrameType,
                        CodeType,
                        Callable[..., Any],
                    ]
                ],
                str,
            ] = inspect.getfile,
        ) -> Any:
            if not inspect.isclass(object):
                return _old_getfile(object)

            # Lookup by parent module (as in current inspect)
            if hasattr(object, "__module__"):
                object_ = sys.modules.get(object.__module__)
                if hasattr(object_, "__file__"):
                    return object_.__file__  # type: ignore[union-attr]

            # If parent module is __main__, lookup by methods
            for name, member in inspect.getmembers(object):
                if (
                    inspect.isfunction(member)
                    and object.__qualname__ + "." + member.__name__
                    == member.__qualname__
                ):
                    return inspect.getfile(member)
            else:
                raise TypeError(f"Source for {object!r} not found.")

        # Monkey patch, compute source, then revert monkey patch.
        _old_getfile = inspect.getfile
        inspect.getfile = _new_getfile
        try:
            src = inspect.getsource(value)
        finally:
            inspect.getfile = _old_getfile
    else:
        # Use standard inspect if running outside a notebook
        src = inspect.getsource(value)
    return src

get_source_root_path()

Gets repository root path or the source root path of the current process.

E.g.:

  • if the process was started by running a run.py file under full/path/to/my/run.py, and the repository root is configured at full/path, the source root path is full/path.

  • same case as above, but when there is no repository root configured, the source root path is full/path/to/my.

Returns:

Type Description
str

The source root path of the current process.

Exceptions:

Type Description
RuntimeError

if the main module was not started or determined.

Source code in zenml/utils/source_utils.py
def get_source_root_path() -> str:
    """Gets repository root path or the source root path of the current process.

    E.g.:

      * if the process was started by running a `run.py` file under
      `full/path/to/my/run.py`, and the repository root is configured at
      `full/path`, the source root path is `full/path`.

      * same case as above, but when there is no repository root configured,
      the source root path is `full/path/to/my`.

    Returns:
        The source root path of the current process.

    Raises:
        RuntimeError: if the main module was not started or determined.
    """
    from zenml.repository import Repository

    repo_root = Repository.find_repository()
    if repo_root:
        logger.debug("Using repository root as source root: %s", repo_root)
        return str(repo_root.resolve())

    main_module = sys.modules.get("__main__")
    if main_module is None:
        raise RuntimeError(
            "Could not determine the main module used to run the current "
            "process."
        )

    if not hasattr(main_module, "__file__") or not main_module.__file__:
        raise RuntimeError(
            "Main module was not started from a file. Cannot "
            "determine the module root path."
        )
    path = pathlib.Path(main_module.__file__).resolve().parent

    logger.debug("Using main module location as source root: %s", path)
    return str(path)

import_class_by_path(class_path)

Imports a class based on a given path.

Parameters:

Name Type Description Default
class_path str

str, class_source e.g. this.module.Class

required

Returns:

Type Description
Type[Any]

the given class

Source code in zenml/utils/source_utils.py
def import_class_by_path(class_path: str) -> Type[Any]:
    """Imports a class based on a given path.

    Args:
        class_path: str, class_source e.g. this.module.Class

    Returns:
        the given class
    """
    modulename, classname = class_path.rsplit(".", 1)
    mod = importlib.import_module(modulename)
    return getattr(mod, classname)  # type: ignore[no-any-return]

import_python_file(file_path)

Imports a python file.

Parameters:

Name Type Description Default
file_path str

Path to python file that should be imported.

required

Returns:

Type Description
imported module

Module

Source code in zenml/utils/source_utils.py
def import_python_file(file_path: str) -> types.ModuleType:
    """Imports a python file.

    Args:
        file_path: Path to python file that should be imported.

    Returns:
        imported module: Module
    """
    # Add directory of python file to PYTHONPATH so we can import it
    file_path = os.path.abspath(file_path)
    module_name = os.path.splitext(os.path.basename(file_path))[0]

    # In case the module is already fully or partially imported and the module
    #  path is something like materializer.materializer the full path needs to
    #  be checked for in the sys.modules to avoid getting an empty namespace
    #  module
    full_module_path = os.path.splitext(
        os.path.relpath(file_path, os.getcwd())
    )[0].replace(os.path.sep, ".")

    if full_module_path not in sys.modules:
        with prepend_python_path(os.path.dirname(file_path)):
            module = importlib.import_module(module_name)
        return module
    else:
        return sys.modules[full_module_path]

is_inside_repository(file_path)

Returns whether a file is inside a zenml repository.

Parameters:

Name Type Description Default
file_path str

A file path.

required

Returns:

Type Description
bool

True if the file is inside a zenml repository, else False.

Source code in zenml/utils/source_utils.py
def is_inside_repository(file_path: str) -> bool:
    """Returns whether a file is inside a zenml repository.

    Args:
        file_path: A file path.

    Returns:
        `True` if the file is inside a zenml repository, else `False`.
    """
    from zenml.repository import Repository

    repo_path = Repository.find_repository()
    if not repo_path:
        return False

    repo_path = repo_path.resolve()
    absolute_file_path = pathlib.Path(file_path).resolve()
    return repo_path in absolute_file_path.parents

is_standard_pin(pin)

Returns True if pin is valid ZenML pin, else False.

Parameters:

Name Type Description Default
pin str

potential ZenML pin like 'zenml_0.1.1'

required

Returns:

Type Description
bool

True if pin is valid ZenML pin, else False.

Source code in zenml/utils/source_utils.py
def is_standard_pin(pin: str) -> bool:
    """Returns `True` if pin is valid ZenML pin, else False.

    Args:
        pin: potential ZenML pin like 'zenml_0.1.1'

    Returns:
        `True` if pin is valid ZenML pin, else False.
    """
    if pin.startswith(f"{APP_NAME}_"):
        return True
    return False

is_standard_source(source)

Returns True if source is a standard ZenML source.

Parameters:

Name Type Description Default
source str

class_source e.g. this.module.Class[@pin].

required

Returns:

Type Description
bool

True if source is a standard ZenML source, else False.

Source code in zenml/utils/source_utils.py
def is_standard_source(source: str) -> bool:
    """Returns `True` if source is a standard ZenML source.

    Args:
        source: class_source e.g. this.module.Class[@pin].

    Returns:
        `True` if source is a standard ZenML source, else `False`.
    """
    if source.split(".")[0] == "zenml":
        return True
    return False

is_third_party_module(file_path)

Returns whether a file belongs to a third party package.

Parameters:

Name Type Description Default
file_path str

A file path.

required

Returns:

Type Description
bool

True if the file belongs to a third party package, else False.

Source code in zenml/utils/source_utils.py
def is_third_party_module(file_path: str) -> bool:
    """Returns whether a file belongs to a third party package.

    Args:
        file_path: A file path.

    Returns:
        `True` if the file belongs to a third party package, else `False`.
    """
    absolute_file_path = pathlib.Path(file_path).resolve()

    for path in site.getsitepackages() + [site.getusersitepackages()]:
        if pathlib.Path(path).resolve() in absolute_file_path.parents:
            return True

    return False

load_source_path_class(source, import_path=None)

Loads a Python class from the source.

Parameters:

Name Type Description Default
source str

class_source e.g. this.module.Class[@sha]

required
import_path Optional[str]

optional path to add to python path

None

Returns:

Type Description
Type[Any]

the given class

Source code in zenml/utils/source_utils.py
def load_source_path_class(
    source: str, import_path: Optional[str] = None
) -> Type[Any]:
    """Loads a Python class from the source.

    Args:
        source: class_source e.g. this.module.Class[@sha]
        import_path: optional path to add to python path

    Returns:
        the given class
    """
    from zenml.repository import Repository

    repo_root = Repository.find_repository()
    if not import_path and repo_root:
        import_path = str(repo_root)

    if "@" in source:
        source = source.split("@")[0]

    if import_path is not None:
        with prepend_python_path(import_path):
            logger.debug(
                f"Loading class {source} with import path {import_path}"
            )
            return import_class_by_path(source)
    return import_class_by_path(source)

prepend_python_path(path)

Simple context manager to help import module within the repo.

Parameters:

Name Type Description Default
path str

str, path to prepend to sys.path

required

Yields:

Type Description
Iterator[NoneType]

None

Source code in zenml/utils/source_utils.py
@contextmanager
def prepend_python_path(path: str) -> Iterator[None]:
    """Simple context manager to help import module within the repo.

    Args:
        path: str, path to prepend to sys.path

    Yields:
        None
    """
    try:
        # Entering the with statement
        sys.path.insert(0, path)
        yield
    finally:
        # Exiting the with statement
        sys.path.remove(path)

resolve_class(class_)

Resolves a class into a serializable source string.

For classes that are not built-in nor imported from a Python package, the get_source_root_path function is used to determine the root path relative to which the class source is resolved.

Parameters:

Name Type Description Default
class_ Type[Any]

A Python Class reference.

required

Returns:

Type Description
str

source_path e.g. this.module.Class.

Source code in zenml/utils/source_utils.py
def resolve_class(class_: Type[Any]) -> str:
    """Resolves a class into a serializable source string.

    For classes that are not built-in nor imported from a Python package, the
    `get_source_root_path` function is used to determine the root path
    relative to which the class source is resolved.

    Args:
        class_: A Python Class reference.

    Returns:
        source_path e.g. this.module.Class.
    """
    initial_source = class_.__module__ + "." + class_.__name__
    if is_standard_source(initial_source):
        return resolve_standard_source(initial_source)

    try:
        file_path = inspect.getfile(class_)
    except TypeError:
        # builtin file
        return initial_source

    if initial_source.startswith("__main__") or is_third_party_module(
        file_path
    ):
        return initial_source

    # Regular user file -> get the full module path relative to the
    # source root.
    module_source = get_module_source_from_module(
        sys.modules[class_.__module__]
    )

    # ENG-123 Sanitize for Windows OS
    # module_source = module_source.replace("\\", ".")

    logger.debug(f"Resolved class {class_} to {module_source}")

    return module_source + "." + class_.__name__

resolve_standard_source(source)

Creates a ZenML pin for source pinning from release version.

Parameters:

Name Type Description Default
source str

class_source e.g. this.module.Class.

required

Returns:

Type Description
str

ZenML pin.

Exceptions:

Type Description
AssertionError

If source is already pinned.

Source code in zenml/utils/source_utils.py
def resolve_standard_source(source: str) -> str:
    """Creates a ZenML pin for source pinning from release version.

    Args:
        source: class_source e.g. this.module.Class.

    Returns:
        ZenML pin.

    Raises:
        AssertionError: If source is already pinned.
    """
    if "@" in source:
        raise AssertionError(f"source {source} is already pinned.")
    pin = create_zenml_pin()
    return f"{source}@{pin}"

validate_flavor_source(source, component_type)

Utility function to import a StackComponent class from a given source and validate its type.

Parameters:

Name Type Description Default
source str

source path of the implementation

required
component_type StackComponentType

the type of the stack component

required

Returns:

Type Description
Type[zenml.stack.stack_component.StackComponent]

the imported class

Exceptions:

Type Description
ValueError

If ZenML cannot find the given module path

TypeError

If the given module path does not point to a subclass of a StackComponent which has the right component type.

Source code in zenml/utils/source_utils.py
def validate_flavor_source(
    source: str, component_type: StackComponentType
) -> Type[StackComponent]:
    """Utility function to import a StackComponent class from a given source and validate its type.

    Args:
        source: source path of the implementation
        component_type: the type of the stack component

    Returns:
        the imported class

    Raises:
        ValueError: If ZenML cannot find the given module path
        TypeError: If the given module path does not point to a subclass of a
            StackComponent which has the right component type.
    """
    try:
        stack_component_class = load_source_path_class(source)
    except (ValueError, AttributeError, ImportError) as e:
        raise ValueError(
            f"ZenML can not import the flavor class '{source}': {e}"
        )

    if not issubclass(stack_component_class, StackComponent):
        raise TypeError(
            f"The source '{source}' does not point to a subclass of the ZenML"
            f"StackComponent."
        )

    if stack_component_class.TYPE != component_type:  # noqa
        raise TypeError(
            f"The source points to a {stack_component_class.TYPE}, not a "  # noqa
            f"{component_type}."
        )

    return stack_component_class  # noqa

string_utils

Utils for strings.

b64_decode(input_)

Returns a decoded string of the base 64 encoded input string.

Parameters:

Name Type Description Default
input_ str

Base64 encoded string.

required

Returns:

Type Description
str

Decoded string.

Source code in zenml/utils/string_utils.py
def b64_decode(input_: str) -> str:
    """Returns a decoded string of the base 64 encoded input string.

    Args:
        input_: Base64 encoded string.

    Returns:
        Decoded string.
    """
    encoded_bytes = input_.encode()
    decoded_bytes = base64.b64decode(encoded_bytes)
    return decoded_bytes.decode()

b64_encode(input_)

Returns a base 64 encoded string of the input string.

Parameters:

Name Type Description Default
input_ str

The input to encode.

required

Returns:

Type Description
str

Base64 encoded string.

Source code in zenml/utils/string_utils.py
def b64_encode(input_: str) -> str:
    """Returns a base 64 encoded string of the input string.

    Args:
        input_: The input to encode.

    Returns:
        Base64 encoded string.
    """
    input_bytes = input_.encode()
    encoded_bytes = base64.b64encode(input_bytes)
    return encoded_bytes.decode()

get_human_readable_filesize(bytes_)

Convert a file size in bytes into a human-readable string.

Parameters:

Name Type Description Default
bytes_ int

The number of bytes to convert.

required

Returns:

Type Description
str

A human-readable string.

Source code in zenml/utils/string_utils.py
def get_human_readable_filesize(bytes_: int) -> str:
    """Convert a file size in bytes into a human-readable string.

    Args:
        bytes_: The number of bytes to convert.

    Returns:
        A human-readable string.
    """
    size = abs(float(bytes_))
    for unit in ["B", "KiB", "MiB", "GiB"]:
        if size < 1024.0 or unit == "GiB":
            break
        size /= 1024.0

    return f"{size:.2f} {unit}"

get_human_readable_time(seconds)

Convert seconds into a human-readable string.

Parameters:

Name Type Description Default
seconds float

The number of seconds to convert.

required

Returns:

Type Description
str

A human-readable string.

Source code in zenml/utils/string_utils.py
def get_human_readable_time(seconds: float) -> str:
    """Convert seconds into a human-readable string.

    Args:
        seconds: The number of seconds to convert.

    Returns:
        A human-readable string.
    """
    prefix = "-" if seconds < 0 else ""
    seconds = abs(seconds)
    int_seconds = int(seconds)
    days, int_seconds = divmod(int_seconds, 86400)
    hours, int_seconds = divmod(int_seconds, 3600)
    minutes, int_seconds = divmod(int_seconds, 60)
    if days > 0:
        time_string = f"{days}d{hours}h{minutes}m{int_seconds}s"
    elif hours > 0:
        time_string = f"{hours}h{minutes}m{int_seconds}s"
    elif minutes > 0:
        time_string = f"{minutes}m{int_seconds}s"
    else:
        time_string = f"{seconds:.3f}s"

    return prefix + time_string

typed_model

Utility classes for adding type information to Pydantic models.

BaseTypedModel (BaseModel) pydantic-model

Typed Pydantic model base class.

Use this class as a base class instead of BaseModel to automatically add a type literal attribute to the model that stores the name of the class.

This can be useful when serializing models to JSON and then de-serializing them as part of a submodel union field, e.g.:


class BluePill(BaseTypedModel):
    ...

class RedPill(BaseTypedModel):
    ...

class TheMatrix(BaseTypedModel):
    choice: Union[BluePill, RedPill] = Field(..., discriminator='type')

matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = TheMatrix.parse_obj(d)
assert isinstance(new_matrix.choice, RedPill)

It can also facilitate de-serializing objects when their type isn't known:

matrix = TheMatrix(choice=RedPill())
d = matrix.dict()
new_matrix = BaseTypedModel.from_dict(d)
assert isinstance(new_matrix.choice, RedPill)
Source code in zenml/utils/typed_model.py
class BaseTypedModel(BaseModel, metaclass=BaseTypedModelMeta):
    """Typed Pydantic model base class.

    Use this class as a base class instead of BaseModel to automatically
    add a `type` literal attribute to the model that stores the name of the
    class.

    This can be useful when serializing models to JSON and then de-serializing
    them as part of a submodel union field, e.g.:

    ```python

    class BluePill(BaseTypedModel):
        ...

    class RedPill(BaseTypedModel):
        ...

    class TheMatrix(BaseTypedModel):
        choice: Union[BluePill, RedPill] = Field(..., discriminator='type')

    matrix = TheMatrix(choice=RedPill())
    d = matrix.dict()
    new_matrix = TheMatrix.parse_obj(d)
    assert isinstance(new_matrix.choice, RedPill)
    ```

    It can also facilitate de-serializing objects when their type isn't known:

    ```python
    matrix = TheMatrix(choice=RedPill())
    d = matrix.dict()
    new_matrix = BaseTypedModel.from_dict(d)
    assert isinstance(new_matrix.choice, RedPill)
    ```
    """

    @classmethod
    def from_dict(
        cls,
        model_dict: Dict[str, Any],
    ) -> "BaseTypedModel":
        """Instantiate a Pydantic model from a serialized JSON-able dict representation.

        Args:
            model_dict: the model attributes serialized as JSON-able dict.

        Returns:
            A BaseTypedModel created from the serialized representation.

        Raises:
            RuntimeError: if the model_dict contains an invalid type.
        """
        model_type = model_dict.get("type")
        if not model_type:
            raise RuntimeError(
                "`type` information is missing from the serialized model dict."
            )
        cls = load_source_path_class(model_type)
        if not issubclass(cls, BaseTypedModel):
            raise RuntimeError(
                f"Class `{cls}` is not a ZenML BaseTypedModel subclass."
            )

        return cls.parse_obj(model_dict)

    @classmethod
    def from_json(
        cls,
        json_str: str,
    ) -> "BaseTypedModel":
        """Instantiate a Pydantic model from a serialized JSON representation.

        Args:
            json_str: the model attributes serialized as JSON.

        Returns:
            A BaseTypedModel created from the serialized representation.
        """
        model_dict = json.loads(json_str)
        return cls.from_dict(model_dict)
from_dict(model_dict) classmethod

Instantiate a Pydantic model from a serialized JSON-able dict representation.

Parameters:

Name Type Description Default
model_dict Dict[str, Any]

the model attributes serialized as JSON-able dict.

required

Returns:

Type Description
BaseTypedModel

A BaseTypedModel created from the serialized representation.

Exceptions:

Type Description
RuntimeError

if the model_dict contains an invalid type.

Source code in zenml/utils/typed_model.py
@classmethod
def from_dict(
    cls,
    model_dict: Dict[str, Any],
) -> "BaseTypedModel":
    """Instantiate a Pydantic model from a serialized JSON-able dict representation.

    Args:
        model_dict: the model attributes serialized as JSON-able dict.

    Returns:
        A BaseTypedModel created from the serialized representation.

    Raises:
        RuntimeError: if the model_dict contains an invalid type.
    """
    model_type = model_dict.get("type")
    if not model_type:
        raise RuntimeError(
            "`type` information is missing from the serialized model dict."
        )
    cls = load_source_path_class(model_type)
    if not issubclass(cls, BaseTypedModel):
        raise RuntimeError(
            f"Class `{cls}` is not a ZenML BaseTypedModel subclass."
        )

    return cls.parse_obj(model_dict)
from_json(json_str) classmethod

Instantiate a Pydantic model from a serialized JSON representation.

Parameters:

Name Type Description Default
json_str str

the model attributes serialized as JSON.

required

Returns:

Type Description
BaseTypedModel

A BaseTypedModel created from the serialized representation.

Source code in zenml/utils/typed_model.py
@classmethod
def from_json(
    cls,
    json_str: str,
) -> "BaseTypedModel":
    """Instantiate a Pydantic model from a serialized JSON representation.

    Args:
        json_str: the model attributes serialized as JSON.

    Returns:
        A BaseTypedModel created from the serialized representation.
    """
    model_dict = json.loads(json_str)
    return cls.from_dict(model_dict)

BaseTypedModelMeta (ModelMetaclass)

Metaclass responsible for adding type information to Pydantic models.

Source code in zenml/utils/typed_model.py
class BaseTypedModelMeta(ModelMetaclass):
    """Metaclass responsible for adding type information to Pydantic models."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BaseTypedModelMeta":
        """Creates a Pydantic BaseModel class.

        This includes a hidden attribute that reflects the full class
        identifier.

        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The class dictionary.

        Returns:
            A Pydantic BaseModel class that includes a hidden attribute that
            reflects the full class identifier.

        Raises:
            TypeError: If the class is not a Pydantic BaseModel class.
        """
        if "type" in dct:
            raise TypeError(
                "`type` is a reserved attribute name for BaseTypedModel "
                "subclasses"
            )
        type_name = f"{dct['__module__']}.{dct['__qualname__']}"
        type_ann = Literal[type_name]  # type: ignore [misc,valid-type]
        type = Field(type_name)
        dct.setdefault("__annotations__", dict())["type"] = type_ann
        dct["type"] = type
        cls = cast(
            Type["BaseTypedModel"], super().__new__(mcs, name, bases, dct)
        )
        return cls
__new__(mcs, name, bases, dct) special staticmethod

Creates a Pydantic BaseModel class.

This includes a hidden attribute that reflects the full class identifier.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases Tuple[Type[Any], ...]

The base classes of the class.

required
dct Dict[str, Any]

The class dictionary.

required

Returns:

Type Description
BaseTypedModelMeta

A Pydantic BaseModel class that includes a hidden attribute that reflects the full class identifier.

Exceptions:

Type Description
TypeError

If the class is not a Pydantic BaseModel class.

Source code in zenml/utils/typed_model.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseTypedModelMeta":
    """Creates a Pydantic BaseModel class.

    This includes a hidden attribute that reflects the full class
    identifier.

    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The class dictionary.

    Returns:
        A Pydantic BaseModel class that includes a hidden attribute that
        reflects the full class identifier.

    Raises:
        TypeError: If the class is not a Pydantic BaseModel class.
    """
    if "type" in dct:
        raise TypeError(
            "`type` is a reserved attribute name for BaseTypedModel "
            "subclasses"
        )
    type_name = f"{dct['__module__']}.{dct['__qualname__']}"
    type_ann = Literal[type_name]  # type: ignore [misc,valid-type]
    type = Field(type_name)
    dct.setdefault("__annotations__", dict())["type"] = type_ann
    dct["type"] = type
    cls = cast(
        Type["BaseTypedModel"], super().__new__(mcs, name, bases, dct)
    )
    return cls

yaml_utils

Utility functions to help with YAML files and data.

UUIDEncoder (JSONEncoder)

JSON encoder for UUID objects.

Source code in zenml/utils/yaml_utils.py
class UUIDEncoder(json.JSONEncoder):
    """JSON encoder for UUID objects."""

    def default(self, obj: Any) -> Any:
        """Default UUID encoder for JSON.

        Args:
            obj: Object to encode.

        Returns:
            Encoded object.
        """
        if isinstance(obj, UUID):
            # if the obj is uuid, we simply return the value of uuid
            return obj.hex
        return json.JSONEncoder.default(self, obj)
default(self, obj)

Default UUID encoder for JSON.

Parameters:

Name Type Description Default
obj Any

Object to encode.

required

Returns:

Type Description
Any

Encoded object.

Source code in zenml/utils/yaml_utils.py
def default(self, obj: Any) -> Any:
    """Default UUID encoder for JSON.

    Args:
        obj: Object to encode.

    Returns:
        Encoded object.
    """
    if isinstance(obj, UUID):
        # if the obj is uuid, we simply return the value of uuid
        return obj.hex
    return json.JSONEncoder.default(self, obj)

append_yaml(file_path, contents)

Append contents to a YAML file at file_path.

Parameters:

Name Type Description Default
file_path str

Path to YAML file.

required
contents Dict[Any, Any]

Contents of YAML file as dict.

required

Exceptions:

Type Description
FileNotFoundError

if directory does not exist.

Source code in zenml/utils/yaml_utils.py
def append_yaml(file_path: str, contents: Dict[Any, Any]) -> None:
    """Append contents to a YAML file at file_path.

    Args:
        file_path: Path to YAML file.
        contents: Contents of YAML file as dict.

    Raises:
        FileNotFoundError: if directory does not exist.
    """
    file_contents = read_yaml(file_path) or {}
    file_contents.update(contents)
    if not io_utils.is_remote(file_path):
        dir_ = str(Path(file_path).parent)
        if not fileio.isdir(dir_):
            raise FileNotFoundError(f"Directory {dir_} does not exist.")
    io_utils.write_file_contents_as_string(file_path, yaml.dump(file_contents))

is_yaml(file_path)

Returns True if file_path is YAML, else False.

Parameters:

Name Type Description Default
file_path str

Path to YAML file.

required

Returns:

Type Description
bool

True if is yaml, else False.

Source code in zenml/utils/yaml_utils.py
def is_yaml(file_path: str) -> bool:
    """Returns True if file_path is YAML, else False.

    Args:
        file_path: Path to YAML file.

    Returns:
        True if is yaml, else False.
    """
    if file_path.endswith("yaml") or file_path.endswith("yml"):
        return True
    return False

read_json(file_path)

Read JSON on file path and returns contents as dict.

Parameters:

Name Type Description Default
file_path str

Path to JSON file.

required

Returns:

Type Description
Any

Contents of the file in a dict.

Exceptions:

Type Description
FileNotFoundError

if file does not exist.

Source code in zenml/utils/yaml_utils.py
def read_json(file_path: str) -> Any:
    """Read JSON on file path and returns contents as dict.

    Args:
        file_path: Path to JSON file.

    Returns:
        Contents of the file in a dict.

    Raises:
        FileNotFoundError: if file does not exist.
    """
    if fileio.exists(file_path):
        contents = io_utils.read_file_contents_as_string(file_path)
        return json.loads(contents)
    else:
        raise FileNotFoundError(f"{file_path} does not exist.")

read_yaml(file_path)

Read YAML on file path and returns contents as dict.

Parameters:

Name Type Description Default
file_path str

Path to YAML file.

required

Returns:

Type Description
Any

Contents of the file in a dict.

Exceptions:

Type Description
FileNotFoundError

if file does not exist.

Source code in zenml/utils/yaml_utils.py
def read_yaml(file_path: str) -> Any:
    """Read YAML on file path and returns contents as dict.

    Args:
        file_path: Path to YAML file.

    Returns:
        Contents of the file in a dict.

    Raises:
        FileNotFoundError: if file does not exist.
    """
    if fileio.exists(file_path):
        contents = io_utils.read_file_contents_as_string(file_path)
        # TODO: [LOW] consider adding a default empty dict to be returned
        #   instead of None
        return yaml.safe_load(contents)
    else:
        raise FileNotFoundError(f"{file_path} does not exist.")

write_json(file_path, contents)

Write contents as JSON format to file_path.

Parameters:

Name Type Description Default
file_path str

Path to JSON file.

required
contents Dict[str, Any]

Contents of JSON file as dict.

required

Exceptions:

Type Description
FileNotFoundError

if directory does not exist.

Source code in zenml/utils/yaml_utils.py
def write_json(file_path: str, contents: Dict[str, Any]) -> None:
    """Write contents as JSON format to file_path.

    Args:
        file_path: Path to JSON file.
        contents: Contents of JSON file as dict.

    Raises:
        FileNotFoundError: if directory does not exist.
    """
    if not io_utils.is_remote(file_path):
        dir_ = str(Path(file_path).parent)
        if not fileio.isdir(dir_):
            # Check if it is a local path, if it doesn't exist, raise Exception.
            raise FileNotFoundError(f"Directory {dir_} does not exist.")
    io_utils.write_file_contents_as_string(file_path, json.dumps(contents))

write_yaml(file_path, contents, sort_keys=True)

Write contents as YAML format to file_path.

Parameters:

Name Type Description Default
file_path str

Path to YAML file.

required
contents Dict[Any, Any]

Contents of YAML file as dict.

required
sort_keys bool

If True, keys are sorted alphabetically. If False, the order in which the keys were inserted into the dict will be preserved.

True

Exceptions:

Type Description
FileNotFoundError

if directory does not exist.

Source code in zenml/utils/yaml_utils.py
def write_yaml(
    file_path: str, contents: Dict[Any, Any], sort_keys: bool = True
) -> None:
    """Write contents as YAML format to file_path.

    Args:
        file_path: Path to YAML file.
        contents: Contents of YAML file as dict.
        sort_keys: If `True`, keys are sorted alphabetically. If `False`,
            the order in which the keys were inserted into the dict will
            be preserved.

    Raises:
        FileNotFoundError: if directory does not exist.
    """
    if not io_utils.is_remote(file_path):
        dir_ = str(Path(file_path).parent)
        if not fileio.isdir(dir_):
            raise FileNotFoundError(f"Directory {dir_} does not exist.")
    io_utils.write_file_contents_as_string(
        file_path, yaml.dump(contents, sort_keys=sort_keys)
    )