Skip to content

Services

zenml.services special

A service is a process or set of processes that outlive a pipeline run.

local special

local_daemon_entrypoint

This executable file is utilized as an entrypoint for all ZenML services that are implemented as locally running daemon processes.

local_service

LocalDaemonService (BaseService) pydantic-model

A service represented by a local daemon process.

This class extends the base service class with functionality concerning the life-cycle management and tracking of external services implemented as local daemon processes.

To define a local daemon service, subclass this class and implement the run method. Upon start, the service will spawn a daemon process that ends up calling the run method.

Examples:


from zenml.services import ServiceType, LocalDaemonService, LocalDaemonServiceConfig
import time

class SleepingDaemonConfig(LocalDaemonServiceConfig):

    wake_up_after: int

class SleepingDaemon(LocalDaemonService):

    SERVICE_TYPE = ServiceType(
        name="sleeper",
        description="Sleeping daemon",
        type="daemon",
        flavor="sleeping",
    )
    config: SleepingDaemonConfig

    def run(self) -> None:
        time.sleep(self.config.wake_up_after)

daemon = SleepingDaemon(config=SleepingDaemonConfig(wake_up_after=10))
daemon.start()

NOTE: the SleepingDaemon class and its parent module have to be discoverable as part of a ZenML Integration, otherwise the daemon will fail with the following error:

TypeError: Cannot load service with unregistered service type:
name='sleeper' type='daemon' flavor='sleeping' description='Sleeping daemon'

Attributes:

Name Type Description
config LocalDaemonServiceConfig

service configuration

status LocalDaemonServiceStatus

service status

endpoint Optional[zenml.services.local.local_service_endpoint.LocalDaemonServiceEndpoint]

optional service endpoint

Source code in zenml/services/local/local_service.py
class LocalDaemonService(BaseService):
    """A service represented by a local daemon process.

    This class extends the base service class with functionality concerning
    the life-cycle management and tracking of external services implemented as
    local daemon processes.

    To define a local daemon service, subclass this class and implement the
    `run` method. Upon `start`, the service will spawn a daemon process that
    ends up calling the `run` method.

    Example:

    ```python

    from zenml.services import ServiceType, LocalDaemonService, LocalDaemonServiceConfig
    import time

    class SleepingDaemonConfig(LocalDaemonServiceConfig):

        wake_up_after: int

    class SleepingDaemon(LocalDaemonService):

        SERVICE_TYPE = ServiceType(
            name="sleeper",
            description="Sleeping daemon",
            type="daemon",
            flavor="sleeping",
        )
        config: SleepingDaemonConfig

        def run(self) -> None:
            time.sleep(self.config.wake_up_after)

    daemon = SleepingDaemon(config=SleepingDaemonConfig(wake_up_after=10))
    daemon.start()
    ```

    NOTE: the `SleepingDaemon` class and its parent module have to be
    discoverable as part of a ZenML `Integration`, otherwise the daemon will
    fail with the following error:

    ```
    TypeError: Cannot load service with unregistered service type:
    name='sleeper' type='daemon' flavor='sleeping' description='Sleeping daemon'
    ```


    Attributes:
        config: service configuration
        status: service status
        endpoint: optional service endpoint
    """

    config: LocalDaemonServiceConfig = Field(
        default_factory=LocalDaemonServiceConfig
    )
    status: LocalDaemonServiceStatus = Field(
        default_factory=LocalDaemonServiceStatus
    )
    # TODO [ENG-705]: allow multiple endpoints per service
    endpoint: Optional[LocalDaemonServiceEndpoint] = None

    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the daemon process.

        Returns:
            The operational state of the daemon process and a message
            providing additional information about that state (e.g. a
            description of the error, if one is encountered).
        """

        if not self.status.pid:
            return ServiceState.INACTIVE, "service daemon is not running"

        # the daemon is running
        return ServiceState.ACTIVE, ""

    def _get_daemon_cmd(self) -> Tuple[List[str], Dict[str, str]]:
        """Get the command to run the service daemon.

        The default implementation provided by this class is the following:

          * this LocalDaemonService instance and its configuration
          are serialized as JSON and saved to a temporary file
          * the local_daemon_entrypoint.py script is launched as a subprocess
          and pointed to the serialized service file
          * the entrypoint script re-creates the LocalDaemonService instance
          from the serialized configuration, reconfigures itself as a daemon
          and detaches itself from the parent process, then calls the `run`
          method that must be implemented by the subclass

        Subclasses that need a different command to launch the service daemon
        should override this method.

        Returns:
            Command needed to launch the daemon process and the environment
            variables to set, in the formats accepted by subprocess.Popen.
        """
        # to avoid circular imports, import here
        import zenml.services.local.local_daemon_entrypoint as daemon_entrypoint

        self.status.silent_daemon = self.config.silent_daemon
        # reuse the config file and logfile location from a previous run,
        # if available
        if not self.status.runtime_path or not os.path.exists(
            self.status.runtime_path
        ):
            # runtime_path points to zenml local stores with uuid to make it
            # easy to track from other locations.
            if self.config.root_runtime_path:
                self.status.runtime_path = os.path.join(
                    self.config.root_runtime_path,
                    str(self.uuid),
                )
                create_dir_recursive_if_not_exists(self.status.runtime_path)
            else:
                self.status.runtime_path = tempfile.mkdtemp(
                    prefix="zenml-service-"
                )

        assert self.status.config_file is not None
        assert self.status.pid_file is not None

        with open(self.status.config_file, "w") as f:
            f.write(self.json(indent=4))

        # delete the previous PID file, in case a previous daemon process
        # crashed and left a stale PID file
        if os.path.exists(self.status.pid_file):
            os.remove(self.status.pid_file)

        command = [
            sys.executable,
            "-m",
            daemon_entrypoint.__name__,
            "--config-file",
            self.status.config_file,
            "--pid-file",
            self.status.pid_file,
        ]
        if self.status.log_file:
            pathlib.Path(self.status.log_file).touch()
            command += ["--log-file", self.status.log_file]

        command_env = os.environ.copy()

        return command, command_env

    def _start_daemon(self) -> None:
        """Start the service daemon process associated with this service."""

        pid = self.status.pid
        if pid:
            # service daemon is already running
            logger.debug(
                "Daemon process for service '%s' is already running with PID %d",
                self,
                pid,
            )
            return

        logger.debug("Starting daemon for service '%s'...", self)

        if self.endpoint:
            self.endpoint.prepare_for_start()

        command, command_env = self._get_daemon_cmd()
        logger.debug(
            "Running command to start daemon for service '%s': %s",
            self,
            " ".join(command),
        )
        p = subprocess.Popen(command, env=command_env)
        p.wait()
        pid = self.status.pid
        if pid:
            logger.debug(
                "Daemon process for service '%s' started with PID: %d",
                self,
                pid,
            )
        else:
            logger.error(
                "Daemon process for service '%s' failed to start",
                self,
            )

    def _stop_daemon(self, force: bool = False) -> None:
        """Stop the service daemon process associated with this service.

        Args:
            force: if True, the service daemon will be forcefully stopped
        """

        pid = self.status.pid
        if not pid:
            # service daemon is not running
            logger.debug(
                "Daemon process for service '%s' no longer running",
                self,
            )
            return

        logger.debug("Stopping daemon for service '%s' ...", self)
        try:
            p = psutil.Process(pid)
        except psutil.Error:
            logger.error(
                "Could not find process for for service '%s' ...", self
            )
            return
        if force:
            p.kill()
        else:
            p.terminate()

    def provision(self) -> None:
        self._start_daemon()

    def deprovision(self, force: bool = False) -> None:
        self._stop_daemon(force)

    @abstractmethod
    def run(self) -> None:
        """Run the service daemon process associated with this service.

        Subclasses must implement this method to provide the service daemon
        functionality. This method will be executed in the context of the
        running daemon, not in the context of the process that calls the
        `start` method.
        """
check_status(self)

Check the the current operational state of the daemon process.

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the daemon process and a message providing additional information about that state (e.g. a description of the error, if one is encountered).

Source code in zenml/services/local/local_service.py
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the daemon process.

    Returns:
        The operational state of the daemon process and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """

    if not self.status.pid:
        return ServiceState.INACTIVE, "service daemon is not running"

    # the daemon is running
    return ServiceState.ACTIVE, ""
deprovision(self, force=False)

Deprovisions all resources used by the service.

Source code in zenml/services/local/local_service.py
def deprovision(self, force: bool = False) -> None:
    self._stop_daemon(force)
provision(self)

Provisions resources to run the service.

Source code in zenml/services/local/local_service.py
def provision(self) -> None:
    self._start_daemon()
run(self)

Run the service daemon process associated with this service.

Subclasses must implement this method to provide the service daemon functionality. This method will be executed in the context of the running daemon, not in the context of the process that calls the start method.

Source code in zenml/services/local/local_service.py
@abstractmethod
def run(self) -> None:
    """Run the service daemon process associated with this service.

    Subclasses must implement this method to provide the service daemon
    functionality. This method will be executed in the context of the
    running daemon, not in the context of the process that calls the
    `start` method.
    """
LocalDaemonServiceConfig (ServiceConfig) pydantic-model

Local daemon service configuration.

Attributes:

Name Type Description
silent_daemon bool

set to True to suppress the output of the daemon (i.e. redirect stdout and stderr to /dev/null). If False, the daemon output will be redirected to a logfile.

root_runtime_path Optional[str]

the root path where the service daemon will store service configuration files

Source code in zenml/services/local/local_service.py
class LocalDaemonServiceConfig(ServiceConfig):
    """Local daemon service configuration.

    Attributes:
        silent_daemon: set to True to suppress the output of the daemon
            (i.e. redirect stdout and stderr to /dev/null). If False, the
            daemon output will be redirected to a logfile.
        root_runtime_path: the root path where the service daemon will store
            service configuration files
    """

    silent_daemon: bool = False
    root_runtime_path: Optional[str] = None
LocalDaemonServiceStatus (ServiceStatus) pydantic-model

Local daemon service status.

Attributes:

Name Type Description
runtime_path Optional[str]

the path where the service daemon runtime files (the configuration file used to start the service daemon and the logfile) are located

silent_daemon bool

flag indicating whether the output of the daemon is suppressed (redirected to /dev/null).

Source code in zenml/services/local/local_service.py
class LocalDaemonServiceStatus(ServiceStatus):
    """Local daemon service status.

    Attributes:
        runtime_path: the path where the service daemon runtime files (the
            configuration file used to start the service daemon and the
            logfile) are located
        silent_daemon: flag indicating whether the output of the daemon
            is suppressed (redirected to /dev/null).
    """

    runtime_path: Optional[str] = None
    # TODO [ENG-704]: remove field duplication between XServiceStatus and
    #   XServiceConfig (e.g. keep a private reference to the config in the
    #   status)
    silent_daemon: bool = False

    @property
    def config_file(self) -> Optional[str]:
        """Get the path to the configuration file used to start the service
        daemon.

        Returns:
            The path to the configuration file, or None, if the
            service has never been started before.
        """
        if not self.runtime_path:
            return None
        return os.path.join(self.runtime_path, SERVICE_DAEMON_CONFIG_FILE_NAME)

    @property
    def log_file(self) -> Optional[str]:
        """Get the path to the log file where the service output is/has been
        logged.

        Returns:
            The path to the log file, or None, if the service has never been
            started before, or if the service daemon output is suppressed.
        """
        if not self.runtime_path or self.silent_daemon:
            return None
        return os.path.join(self.runtime_path, SERVICE_DAEMON_LOG_FILE_NAME)

    @property
    def pid_file(self) -> Optional[str]:
        """Get the path to the daemon PID file where the last known PID of the
        daemon process is stored.

        Returns:
            The path to the PID file, or None, if the service has never been
            started before.
        """
        if not self.runtime_path or self.silent_daemon:
            return None
        return os.path.join(self.runtime_path, SERVICE_DAEMON_PID_FILE_NAME)

    @property
    def pid(self) -> Optional[int]:
        """Return the PID of the currently running daemon"""
        pid_file = self.pid_file
        if not pid_file:
            return None
        if sys.platform == "win32":
            logger.warning(
                "Daemon functionality is currently not supported on Windows."
            )
            return None
        else:
            import zenml.services.local.local_daemon_entrypoint as daemon_entrypoint
            from zenml.utils.daemon import get_daemon_pid_if_running

            pid = get_daemon_pid_if_running(pid_file)

            # let's be extra careful here and check that the PID really
            # belongs to a process that is a local ZenML daemon.
            # this avoids the situation where a PID file is left over from
            # a previous daemon run, but another process is using the same
            # PID.
            p = psutil.Process(pid)
            cmd_line = p.cmdline()
            if (
                daemon_entrypoint.__name__ not in cmd_line
                or self.config_file not in cmd_line
            ):
                return None
            return pid
config_file: Optional[str] property readonly

Get the path to the configuration file used to start the service daemon.

Returns:

Type Description
Optional[str]

The path to the configuration file, or None, if the service has never been started before.

log_file: Optional[str] property readonly

Get the path to the log file where the service output is/has been logged.

Returns:

Type Description
Optional[str]

The path to the log file, or None, if the service has never been started before, or if the service daemon output is suppressed.

pid: Optional[int] property readonly

Return the PID of the currently running daemon

pid_file: Optional[str] property readonly

Get the path to the daemon PID file where the last known PID of the daemon process is stored.

Returns:

Type Description
Optional[str]

The path to the PID file, or None, if the service has never been started before.

local_service_endpoint

LocalDaemonServiceEndpoint (BaseServiceEndpoint) pydantic-model

A service endpoint exposed by a local daemon process.

This class extends the base service endpoint class with functionality concerning the life-cycle management and tracking of endpoints exposed by external services implemented as local daemon processes.

Attributes:

Name Type Description
config LocalDaemonServiceEndpointConfig

service endpoint configuration

status LocalDaemonServiceEndpointStatus

service endpoint status

monitor Union[zenml.services.service_monitor.HTTPEndpointHealthMonitor, zenml.services.service_monitor.TCPEndpointHealthMonitor]

optional service endpoint health monitor

Source code in zenml/services/local/local_service_endpoint.py
class LocalDaemonServiceEndpoint(BaseServiceEndpoint):
    """A service endpoint exposed by a local daemon process.

    This class extends the base service endpoint class with functionality
    concerning the life-cycle management and tracking of endpoints exposed
    by external services implemented as local daemon processes.

    Attributes:
        config: service endpoint configuration
        status: service endpoint status
        monitor: optional service endpoint health monitor
    """

    config: LocalDaemonServiceEndpointConfig = Field(
        default_factory=LocalDaemonServiceEndpointConfig
    )
    status: LocalDaemonServiceEndpointStatus = Field(
        default_factory=LocalDaemonServiceEndpointStatus
    )
    monitor: Optional[
        Union[HTTPEndpointHealthMonitor, TCPEndpointHealthMonitor]
    ] = Field(..., discriminator="type")

    def _lookup_free_port(self) -> int:
        """Search for a free TCP port for the service endpoint.

        If a preferred TCP port value is explicitly requested through the
        endpoint configuration, it will be checked first. If a port was
        previously used the last time the service was running (i.e. as
        indicated in the service endpoint status), it will be checked next for
        availability.

        As a last resort, this call will search for a free TCP port, if
        `allocate_port` is set to True in the endpoint configuration.

        Returns:
            An available TCP port number

        Raises:
            IOError: if the preferred TCP port is busy and `allocate_port` is
                disabled in the endpoint configuration, or if no free TCP port
                could be otherwise allocated.
        """

        # If a port value is explicitly configured, attempt to use it first
        if self.config.port:
            if port_available(self.config.port):
                return self.config.port
            if not self.config.allocate_port:
                raise IOError(f"TCP port {self.config.port} is not available.")

        # Attempt to reuse the port used when the services was last running
        if self.status.port and port_available(self.status.port):
            return self.status.port

        port = scan_for_available_port()
        if port:
            return port
        raise IOError("No free TCP ports found")

    def prepare_for_start(self) -> None:
        """Prepare the service endpoint for starting.

        This method is called before the service is started.
        """
        self.status.protocol = self.config.protocol
        self.status.hostname = "localhost"
        self.status.port = self._lookup_free_port()
prepare_for_start(self)

Prepare the service endpoint for starting.

This method is called before the service is started.

Source code in zenml/services/local/local_service_endpoint.py
def prepare_for_start(self) -> None:
    """Prepare the service endpoint for starting.

    This method is called before the service is started.
    """
    self.status.protocol = self.config.protocol
    self.status.hostname = "localhost"
    self.status.port = self._lookup_free_port()
LocalDaemonServiceEndpointConfig (ServiceEndpointConfig) pydantic-model

Local daemon service endpoint configuration.

Attributes:

Name Type Description
protocol ServiceEndpointProtocol

the TCP protocol implemented by the service endpoint

port Optional[int]

preferred TCP port value for the service endpoint. If the port is in use when the service is started, setting allocate_port to True will also try to allocate a new port value, otherwise an exception will be raised.

allocate_port bool

set to True to allocate a free TCP port for the service endpoint automatically.

Source code in zenml/services/local/local_service_endpoint.py
class LocalDaemonServiceEndpointConfig(ServiceEndpointConfig):
    """Local daemon service endpoint configuration.

    Attributes:
        protocol: the TCP protocol implemented by the service endpoint
        port: preferred TCP port value for the service endpoint. If the port
            is in use when the service is started, setting `allocate_port` to
            True will also try to allocate a new port value, otherwise an
            exception will be raised.
        allocate_port: set to True to allocate a free TCP port for the
            service endpoint automatically.
    """

    protocol: ServiceEndpointProtocol = ServiceEndpointProtocol.TCP
    port: Optional[int] = None
    allocate_port: bool = True
LocalDaemonServiceEndpointStatus (ServiceEndpointStatus) pydantic-model

Local daemon service endpoint status.

Source code in zenml/services/local/local_service_endpoint.py
class LocalDaemonServiceEndpointStatus(ServiceEndpointStatus):
    """Local daemon service endpoint status."""

service

BaseService (BaseTypedModel) pydantic-model

Base service class

This class implements generic functionality concerning the life-cycle management and tracking of an external service (e.g. process, container, Kubernetes deployment etc.).

Attributes:

Name Type Description
SERVICE_TYPE ClassVar[zenml.services.service_type.ServiceType]

a service type descriptor with information describing the service class. Every concrete service class must define this.

admin_state ServiceState

the administrative state of the service.

uuid UUID

unique UUID identifier for the service instance.

config LocalDaemonServiceConfig

service configuration

status LocalDaemonServiceStatus

service status

endpoint Optional[zenml.services.local.local_service_endpoint.LocalDaemonServiceEndpoint]

optional service endpoint

Source code in zenml/services/service.py
class BaseService(BaseTypedModel, metaclass=BaseServiceMeta):
    """Base service class

    This class implements generic functionality concerning the life-cycle
    management and tracking of an external service (e.g. process, container,
    Kubernetes deployment etc.).

    Attributes:
        SERVICE_TYPE: a service type descriptor with information describing
            the service class. Every concrete service class must define this.
        admin_state: the administrative state of the service.
        uuid: unique UUID identifier for the service instance.
        config: service configuration
        status: service status
        endpoint: optional service endpoint
    """

    SERVICE_TYPE: ClassVar[ServiceType]

    uuid: UUID = Field(default_factory=uuid4, allow_mutation=False)
    admin_state: ServiceState = ServiceState.INACTIVE
    config: ServiceConfig
    status: ServiceStatus
    # TODO [ENG-703]: allow multiple endpoints per service
    endpoint: Optional[BaseServiceEndpoint]

    def __init__(
        self,
        **attrs: Any,
    ) -> None:
        super().__init__(**attrs)
        self.config.name = self.config.name or self.__class__.__name__

    @abstractmethod
    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the external service.

        This method should be overridden by subclasses that implement
        concrete service tracking functionality.

        Returns:
            The operational state of the external service and a message
            providing additional information about that state (e.g. a
            description of the error if one is encountered while checking the
            service status).
        """

    def update_status(self) -> None:
        """Check the current operational state of the external service
        and update the local operational status information to reflect it.

        This method should be overridden by subclasses that implement
        concrete service status tracking functionality.
        """
        logger.debug(
            "Running status check for service '%s' ...",
            self,
        )
        state, err = self.check_status()
        logger.debug(
            "Status check results for service '%s': %s [%s]",
            self,
            state.name,
            err,
        )
        self.status.update_state(state, err)

        # don't bother checking the endpoint state if the service is not active
        if self.status.state == ServiceState.INACTIVE:
            return

        if self.endpoint:
            self.endpoint.update_status()

    def poll_service_status(self, timeout: int = 0) -> None:
        """Poll the external service status until the service operational
        state matches the administrative state, the service enters a failed
        state, or the timeout is reached.

        Args:
            timeout: maximum time to wait for the service operational state
            to match the administrative state, in seconds
        """
        time_remaining = timeout
        while True:
            if self.admin_state == ServiceState.ACTIVE and self.is_running:
                return
            if self.admin_state == ServiceState.INACTIVE and self.is_stopped:
                return
            if self.is_failed:
                return
            if time_remaining <= 0:
                break
            time.sleep(1)
            time_remaining -= 1

        if timeout > 0:
            logger.error(
                f"Timed out waiting for service {self} to become "
                f"{self.admin_state.value}: {self.status.last_error}"
            )

    @property
    def is_running(self) -> bool:
        """Check if the service is currently running.

        This method will actively poll the external service to get its status
        and will return the result.

        Returns:
            True if the service is running and active (i.e. the endpoints are
            responsive, if any are configured), otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.ACTIVE and (
            not self.endpoint or self.endpoint.is_active()
        )

    @property
    def is_stopped(self) -> bool:
        """Check if the service is currently stopped.

        This method will actively poll the external service to get its status
        and will return the result.

        Returns:
            True if the service is stopped, otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.INACTIVE

    @property
    def is_failed(self) -> bool:
        """Check if the service is currently failed.

        This method will actively poll the external service to get its status
        and will return the result.

        Returns:
            True if the service is in a failure state, otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.ERROR

    def provision(self) -> None:
        """Provisions resources to run the service."""
        raise NotImplementedError(
            f"Provisioning resources not implemented for {self}."
        )

    def deprovision(self, force: bool = False) -> None:
        """Deprovisions all resources used by the service."""
        raise NotImplementedError(
            f"Deprovisioning resources not implemented for {self}."
        )

    def update(self, config: ServiceConfig) -> None:
        """Update the service configuration.

        Args:
            config: the new service configuration.
        """
        self.config = config

    def start(self, timeout: int = 0) -> None:
        """Start the service and optionally wait for it to become active.

        Args:
            timeout: amount of time to wait for the service to become active.
                If set to 0, the method will return immediately after checking
                the service status.

        Raises:
            RuntimeError: if the service cannot be started
        """
        with console.status(f"Starting service '{self}'.\n"):
            self.admin_state = ServiceState.ACTIVE
            self.provision()
            if timeout > 0:
                self.poll_service_status(timeout)
                if not self.is_running:
                    raise RuntimeError(
                        f"Failed to start service {self}. Last state: "
                        f"'{self.status.state.value}'. Last error: "
                        f"'{self.status.last_error}'"
                    )

    def stop(self, timeout: int = 0, force: bool = False) -> None:
        """Stop the service and optionally wait for it to shutdown.

        Args:
            timeout: amount of time to wait for the service to shutdown.
                If set to 0, the method will return immediately after checking
                the service status.

        Raises:
            RuntimeError: if the service cannot be stopped
        """
        with console.status(f"Stopping service '{self}'.\n"):
            self.admin_state = ServiceState.INACTIVE
            self.deprovision(force)
            if timeout > 0:
                self.poll_service_status(timeout)
                if not self.is_stopped:
                    raise RuntimeError(f"Failed to stop service {self}.")

    def __repr__(self) -> str:
        """String representation of the service."""
        return f"{self.__class__.__qualname__}[{self.uuid}] (type: {self.SERVICE_TYPE.type}, flavor: {self.SERVICE_TYPE.flavor})"

    def __str__(self) -> str:
        """String representation of the service."""
        return self.__repr__()

    class Config:
        """Pydantic configuration class."""

        # validate attribute assignments
        validate_assignment = True
        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
is_failed: bool property readonly

Check if the service is currently failed.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is in a failure state, otherwise False.

is_running: bool property readonly

Check if the service is currently running.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is running and active (i.e. the endpoints are responsive, if any are configured), otherwise False.

is_stopped: bool property readonly

Check if the service is currently stopped.

This method will actively poll the external service to get its status and will return the result.

Returns:

Type Description
bool

True if the service is stopped, otherwise False.

Config

Pydantic configuration class.

Source code in zenml/services/service.py
class Config:
    """Pydantic configuration class."""

    # validate attribute assignments
    validate_assignment = True
    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
__repr__(self) special

String representation of the service.

Source code in zenml/services/service.py
def __repr__(self) -> str:
    """String representation of the service."""
    return f"{self.__class__.__qualname__}[{self.uuid}] (type: {self.SERVICE_TYPE.type}, flavor: {self.SERVICE_TYPE.flavor})"
__str__(self) special

String representation of the service.

Source code in zenml/services/service.py
def __str__(self) -> str:
    """String representation of the service."""
    return self.__repr__()
check_status(self)

Check the the current operational state of the external service.

This method should be overridden by subclasses that implement concrete service tracking functionality.

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the external service and a message providing additional information about that state (e.g. a description of the error if one is encountered while checking the service status).

Source code in zenml/services/service.py
@abstractmethod
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external service.

    This method should be overridden by subclasses that implement
    concrete service tracking functionality.

    Returns:
        The operational state of the external service and a message
        providing additional information about that state (e.g. a
        description of the error if one is encountered while checking the
        service status).
    """
deprovision(self, force=False)

Deprovisions all resources used by the service.

Source code in zenml/services/service.py
def deprovision(self, force: bool = False) -> None:
    """Deprovisions all resources used by the service."""
    raise NotImplementedError(
        f"Deprovisioning resources not implemented for {self}."
    )
poll_service_status(self, timeout=0)

Poll the external service status until the service operational state matches the administrative state, the service enters a failed state, or the timeout is reached.

Parameters:

Name Type Description Default
timeout int

maximum time to wait for the service operational state

0
Source code in zenml/services/service.py
def poll_service_status(self, timeout: int = 0) -> None:
    """Poll the external service status until the service operational
    state matches the administrative state, the service enters a failed
    state, or the timeout is reached.

    Args:
        timeout: maximum time to wait for the service operational state
        to match the administrative state, in seconds
    """
    time_remaining = timeout
    while True:
        if self.admin_state == ServiceState.ACTIVE and self.is_running:
            return
        if self.admin_state == ServiceState.INACTIVE and self.is_stopped:
            return
        if self.is_failed:
            return
        if time_remaining <= 0:
            break
        time.sleep(1)
        time_remaining -= 1

    if timeout > 0:
        logger.error(
            f"Timed out waiting for service {self} to become "
            f"{self.admin_state.value}: {self.status.last_error}"
        )
provision(self)

Provisions resources to run the service.

Source code in zenml/services/service.py
def provision(self) -> None:
    """Provisions resources to run the service."""
    raise NotImplementedError(
        f"Provisioning resources not implemented for {self}."
    )
start(self, timeout=0)

Start the service and optionally wait for it to become active.

Parameters:

Name Type Description Default
timeout int

amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status.

0

Exceptions:

Type Description
RuntimeError

if the service cannot be started

Source code in zenml/services/service.py
def start(self, timeout: int = 0) -> None:
    """Start the service and optionally wait for it to become active.

    Args:
        timeout: amount of time to wait for the service to become active.
            If set to 0, the method will return immediately after checking
            the service status.

    Raises:
        RuntimeError: if the service cannot be started
    """
    with console.status(f"Starting service '{self}'.\n"):
        self.admin_state = ServiceState.ACTIVE
        self.provision()
        if timeout > 0:
            self.poll_service_status(timeout)
            if not self.is_running:
                raise RuntimeError(
                    f"Failed to start service {self}. Last state: "
                    f"'{self.status.state.value}'. Last error: "
                    f"'{self.status.last_error}'"
                )
stop(self, timeout=0, force=False)

Stop the service and optionally wait for it to shutdown.

Parameters:

Name Type Description Default
timeout int

amount of time to wait for the service to shutdown. If set to 0, the method will return immediately after checking the service status.

0

Exceptions:

Type Description
RuntimeError

if the service cannot be stopped

Source code in zenml/services/service.py
def stop(self, timeout: int = 0, force: bool = False) -> None:
    """Stop the service and optionally wait for it to shutdown.

    Args:
        timeout: amount of time to wait for the service to shutdown.
            If set to 0, the method will return immediately after checking
            the service status.

    Raises:
        RuntimeError: if the service cannot be stopped
    """
    with console.status(f"Stopping service '{self}'.\n"):
        self.admin_state = ServiceState.INACTIVE
        self.deprovision(force)
        if timeout > 0:
            self.poll_service_status(timeout)
            if not self.is_stopped:
                raise RuntimeError(f"Failed to stop service {self}.")
update(self, config)

Update the service configuration.

Parameters:

Name Type Description Default
config ServiceConfig

the new service configuration.

required
Source code in zenml/services/service.py
def update(self, config: ServiceConfig) -> None:
    """Update the service configuration.

    Args:
        config: the new service configuration.
    """
    self.config = config
update_status(self)

Check the current operational state of the external service and update the local operational status information to reflect it.

This method should be overridden by subclasses that implement concrete service status tracking functionality.

Source code in zenml/services/service.py
def update_status(self) -> None:
    """Check the current operational state of the external service
    and update the local operational status information to reflect it.

    This method should be overridden by subclasses that implement
    concrete service status tracking functionality.
    """
    logger.debug(
        "Running status check for service '%s' ...",
        self,
    )
    state, err = self.check_status()
    logger.debug(
        "Status check results for service '%s': %s [%s]",
        self,
        state.name,
        err,
    )
    self.status.update_state(state, err)

    # don't bother checking the endpoint state if the service is not active
    if self.status.state == ServiceState.INACTIVE:
        return

    if self.endpoint:
        self.endpoint.update_status()

BaseServiceMeta (BaseTypedModelMeta)

Metaclass responsible for registering different BaseService subclasses.

This metaclass has two main responsibilities: 1. register all BaseService types in the service registry. This is relevant when services are deserialized and instantiated from their JSON or dict representation, because their type needs to be known beforehand. 2. ensuring BaseService instance uniqueness by enforcing that no two service instances have the same UUID value. Implementing this at the constructor level guarantees that deserializing a service instance from a JSON representation multiple times always returns the same service object.

Source code in zenml/services/service.py
class BaseServiceMeta(BaseTypedModelMeta):
    """Metaclass responsible for registering different BaseService
    subclasses.

    This metaclass has two main responsibilities:
    1. register all BaseService types in the service registry. This is relevant
    when services are deserialized and instantiated from their JSON or dict
    representation, because their type needs to be known beforehand.
    2. ensuring BaseService instance uniqueness by enforcing that no two
    service instances have the same UUID value. Implementing this at the
    constructor level guarantees that deserializing a service instance from
    a JSON representation multiple times always returns the same service object.
    """

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "BaseServiceMeta":
        """Creates a BaseService class and registers it in
        the `ServiceRegistry`."""
        service_type = dct.get("SERVICE_TYPE", None)

        # register only classes of concrete service implementations
        if service_type:
            # add the service type class attribute to the class as a regular
            # immutable attribute to include it in the JSON representation
            if "service_type" in dct:
                raise TypeError(
                    "`service_type` is a reserved attribute name for BaseService "
                    "subclasses"
                )
            dct.setdefault("__annotations__", dict())[
                "service_type"
            ] = ServiceType
            dct["service_type"] = Field(service_type, allow_mutation=False)

        cls = cast(Type["BaseService"], super().__new__(mcs, name, bases, dct))

        # register only classes of concrete service implementations
        if service_type:
            # register the service type in the service registry
            ServiceRegistry().register_service_type(cls)
        return cls

    def __call__(cls, *args: Any, **kwargs: Any) -> "BaseServiceMeta":
        """Validate the creation of a service."""
        if not getattr(cls, "SERVICE_TYPE", None):
            raise AttributeError(
                f"Untyped service instances are not allowed. Please set the "
                f"SERVICE_TYPE class attribute for {cls}."
            )
        uuid = kwargs.get("uuid", None)
        if uuid:
            if isinstance(uuid, str):
                uuid = UUID(uuid)
            if not isinstance(uuid, UUID):
                raise ValueError(
                    f"The `uuid` argument for {cls} must be a UUID instance or a "
                    f"string representation of a UUID."
                )

            # if a service instance with the same UUID is already registered,
            # return the existing instance rather than the newly created one
            existing_service = ServiceRegistry().get_service(uuid)
            if existing_service:
                logger.debug(
                    f"Reusing existing service '{existing_service}' "
                    f"instead of creating a new service with the same UUID."
                )
                return cast("BaseServiceMeta", existing_service)

        svc = cast("BaseService", super().__call__(*args, **kwargs))
        ServiceRegistry().register_service(svc)
        return cast("BaseServiceMeta", svc)
__call__(cls, *args, **kwargs) special

Validate the creation of a service.

Source code in zenml/services/service.py
def __call__(cls, *args: Any, **kwargs: Any) -> "BaseServiceMeta":
    """Validate the creation of a service."""
    if not getattr(cls, "SERVICE_TYPE", None):
        raise AttributeError(
            f"Untyped service instances are not allowed. Please set the "
            f"SERVICE_TYPE class attribute for {cls}."
        )
    uuid = kwargs.get("uuid", None)
    if uuid:
        if isinstance(uuid, str):
            uuid = UUID(uuid)
        if not isinstance(uuid, UUID):
            raise ValueError(
                f"The `uuid` argument for {cls} must be a UUID instance or a "
                f"string representation of a UUID."
            )

        # if a service instance with the same UUID is already registered,
        # return the existing instance rather than the newly created one
        existing_service = ServiceRegistry().get_service(uuid)
        if existing_service:
            logger.debug(
                f"Reusing existing service '{existing_service}' "
                f"instead of creating a new service with the same UUID."
            )
            return cast("BaseServiceMeta", existing_service)

    svc = cast("BaseService", super().__call__(*args, **kwargs))
    ServiceRegistry().register_service(svc)
    return cast("BaseServiceMeta", svc)
__new__(mcs, name, bases, dct) special staticmethod

Creates a BaseService class and registers it in the ServiceRegistry.

Source code in zenml/services/service.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseServiceMeta":
    """Creates a BaseService class and registers it in
    the `ServiceRegistry`."""
    service_type = dct.get("SERVICE_TYPE", None)

    # register only classes of concrete service implementations
    if service_type:
        # add the service type class attribute to the class as a regular
        # immutable attribute to include it in the JSON representation
        if "service_type" in dct:
            raise TypeError(
                "`service_type` is a reserved attribute name for BaseService "
                "subclasses"
            )
        dct.setdefault("__annotations__", dict())[
            "service_type"
        ] = ServiceType
        dct["service_type"] = Field(service_type, allow_mutation=False)

    cls = cast(Type["BaseService"], super().__new__(mcs, name, bases, dct))

    # register only classes of concrete service implementations
    if service_type:
        # register the service type in the service registry
        ServiceRegistry().register_service_type(cls)
    return cls

ServiceConfig (BaseTypedModel) pydantic-model

Generic service configuration.

Concrete service classes should extend this class and add additional attributes that they want to see reflected and used in the service configuration.

Attributes:

Name Type Description
name str

name for the service instance

description str

description of the service

pipeline_name str

name of the pipeline that spun up the service

pipeline_run_id str

ID of the pipeline run that spun up the service

pipeline_step_name str

name of the pipeline step that spun up the service

Source code in zenml/services/service.py
class ServiceConfig(BaseTypedModel):
    """Generic service configuration.

    Concrete service classes should extend this class and add additional
    attributes that they want to see reflected and used in the service
    configuration.

    Attributes:
        name: name for the service instance
        description: description of the service
        pipeline_name: name of the pipeline that spun up the service
        pipeline_run_id: ID of the pipeline run that spun up the service
        pipeline_step_name: name of the pipeline step that spun up the service
    """

    name: str = ""
    description: str = ""
    pipeline_name: str = ""
    pipeline_run_id: str = ""
    pipeline_step_name: str = ""

service_endpoint

BaseServiceEndpoint (BaseTypedModel) pydantic-model

Base service class

This class implements generic functionality concerning the life-cycle management and tracking of an external service endpoint (e.g. a HTTP/HTTPS API or generic TCP endpoint exposed by a service).

Attributes:

Name Type Description
admin_state ServiceState

the administrative state of the service endpoint

config LocalDaemonServiceEndpointConfig

service endpoint configuration

status LocalDaemonServiceEndpointStatus

service endpoint status

monitor Union[zenml.services.service_monitor.HTTPEndpointHealthMonitor, zenml.services.service_monitor.TCPEndpointHealthMonitor]

optional service endpoint health monitor

Source code in zenml/services/service_endpoint.py
class BaseServiceEndpoint(BaseTypedModel):
    """Base service class

    This class implements generic functionality concerning the life-cycle
    management and tracking of an external service endpoint (e.g. a HTTP/HTTPS
    API or generic TCP endpoint exposed by a service).

    Attributes:
        admin_state: the administrative state of the service endpoint
        config: service endpoint configuration
        status: service endpoint status
        monitor: optional service endpoint health monitor
    """

    admin_state: ServiceState = ServiceState.INACTIVE
    config: ServiceEndpointConfig
    status: ServiceEndpointStatus
    # TODO [ENG-701]: allow multiple monitors per endpoint
    monitor: Optional[BaseServiceEndpointHealthMonitor] = None

    def __init__(
        self,
        *args: Any,
        **kwargs: Any,
    ) -> None:
        super().__init__(*args, **kwargs)
        self.config.name = self.config.name or self.__class__.__name__

    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the external
        service endpoint.

        Returns:
            The operational state of the external service endpoint and a
            message providing additional information about that state
            (e.g. a description of the error, if one is encountered while
            checking the service status).
        """
        if not self.monitor:
            # no health monitor configured; assume service operational state
            # always matches the admin state
            return self.admin_state, ""
        return self.monitor.check_endpoint_status(self)

    def update_status(self) -> None:
        """Check the the current operational state of the external service
        endpoint and update the local operational status information
        accordingly.
        """
        logger.debug(
            "Running health check for service endpoint '%s' ...",
            self.config.name,
        )
        state, err = self.check_status()
        logger.debug(
            "Health check results for service endpoint '%s': %s [%s]",
            self.config.name,
            state.name,
            err,
        )
        self.status.update_state(state, err)

    def is_active(self) -> bool:
        """Check if the service endpoint is active (i.e. is responsive and can
        receive requests).

        This method will use the configured health monitor to actively check the
        endpoint status and will return the result.

        Returns:
            True if the service endpoint is active, otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.ACTIVE

    def is_inactive(self) -> bool:
        """Check if the service endpoint is inactive (i.e. is not responsive and
        cannot receive requests).

        This method will use the configured health monitor to actively check the
        endpoint status and will return the result.

        Returns:
            True if the service endpoint is inactive, otherwise False.
        """
        self.update_status()
        return self.status.state == ServiceState.INACTIVE
check_status(self)

Check the the current operational state of the external service endpoint.

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the external service endpoint and a message providing additional information about that state (e.g. a description of the error, if one is encountered while checking the service status).

Source code in zenml/services/service_endpoint.py
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external
    service endpoint.

    Returns:
        The operational state of the external service endpoint and a
        message providing additional information about that state
        (e.g. a description of the error, if one is encountered while
        checking the service status).
    """
    if not self.monitor:
        # no health monitor configured; assume service operational state
        # always matches the admin state
        return self.admin_state, ""
    return self.monitor.check_endpoint_status(self)
is_active(self)

Check if the service endpoint is active (i.e. is responsive and can receive requests).

This method will use the configured health monitor to actively check the endpoint status and will return the result.

Returns:

Type Description
bool

True if the service endpoint is active, otherwise False.

Source code in zenml/services/service_endpoint.py
def is_active(self) -> bool:
    """Check if the service endpoint is active (i.e. is responsive and can
    receive requests).

    This method will use the configured health monitor to actively check the
    endpoint status and will return the result.

    Returns:
        True if the service endpoint is active, otherwise False.
    """
    self.update_status()
    return self.status.state == ServiceState.ACTIVE
is_inactive(self)

Check if the service endpoint is inactive (i.e. is not responsive and cannot receive requests).

This method will use the configured health monitor to actively check the endpoint status and will return the result.

Returns:

Type Description
bool

True if the service endpoint is inactive, otherwise False.

Source code in zenml/services/service_endpoint.py
def is_inactive(self) -> bool:
    """Check if the service endpoint is inactive (i.e. is not responsive and
    cannot receive requests).

    This method will use the configured health monitor to actively check the
    endpoint status and will return the result.

    Returns:
        True if the service endpoint is inactive, otherwise False.
    """
    self.update_status()
    return self.status.state == ServiceState.INACTIVE
update_status(self)

Check the the current operational state of the external service endpoint and update the local operational status information accordingly.

Source code in zenml/services/service_endpoint.py
def update_status(self) -> None:
    """Check the the current operational state of the external service
    endpoint and update the local operational status information
    accordingly.
    """
    logger.debug(
        "Running health check for service endpoint '%s' ...",
        self.config.name,
    )
    state, err = self.check_status()
    logger.debug(
        "Health check results for service endpoint '%s': %s [%s]",
        self.config.name,
        state.name,
        err,
    )
    self.status.update_state(state, err)

ServiceEndpointConfig (BaseTypedModel) pydantic-model

Generic service endpoint configuration.

Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the endpoint configuration.

Attributes:

Name Type Description
name str

unique name for the service endpoint

description str

description of the service endpoint

Source code in zenml/services/service_endpoint.py
class ServiceEndpointConfig(BaseTypedModel):
    """Generic service endpoint configuration.

    Concrete service classes should extend this class and add additional
    attributes that they want to see reflected and use in the endpoint
    configuration.

    Attributes:
        name: unique name for the service endpoint
        description: description of the service endpoint
    """

    name: str = ""
    description: str = ""

ServiceEndpointProtocol (StrEnum)

Possible endpoint protocol values.

Source code in zenml/services/service_endpoint.py
class ServiceEndpointProtocol(StrEnum):
    """Possible endpoint protocol values."""

    TCP = "tcp"
    HTTP = "http"
    HTTPS = "https"

ServiceEndpointStatus (ServiceStatus) pydantic-model

Status information describing the operational state of a service endpoint (e.g. a HTTP/HTTPS API or generic TCP endpoint exposed by a service).

Concrete service classes should extend this class and add additional attributes that make up the operational state of the service endpoint.

Attributes:

Name Type Description
protocol ServiceEndpointProtocol

the TCP protocol used by the service endpoint

hostname Optional[str]

the hostname where the service endpoint is accessible

port Optional[int]

the current TCP port where the service endpoint is accessible

Source code in zenml/services/service_endpoint.py
class ServiceEndpointStatus(ServiceStatus):
    """Status information describing the operational state of a service
    endpoint (e.g. a HTTP/HTTPS API or generic TCP endpoint exposed by a
    service).

    Concrete service classes should extend this class and add additional
    attributes that make up the operational state of the service endpoint.

    Attributes:
        protocol: the TCP protocol used by the service endpoint
        hostname: the hostname where the service endpoint is accessible
        port: the current TCP port where the service endpoint is accessible
    """

    protocol: ServiceEndpointProtocol = ServiceEndpointProtocol.TCP
    hostname: Optional[str] = None
    port: Optional[int] = None

    @property
    def uri(self) -> Optional[str]:
        """Get the URI of the service endpoint.

        Returns:
            The URI of the service endpoint or None, if the service endpoint
            operational status doesn't have the required information.
        """
        if not self.hostname or not self.port or not self.protocol:
            # the service is not yet in a state in which the endpoint hostname
            # port and protocol are known
            return None

        return f"{self.protocol.value}://{self.hostname}:{self.port}/"
uri: Optional[str] property readonly

Get the URI of the service endpoint.

Returns:

Type Description
Optional[str]

The URI of the service endpoint or None, if the service endpoint operational status doesn't have the required information.

service_monitor

BaseServiceEndpointHealthMonitor (BaseTypedModel) pydantic-model

Base class used for service endpoint health monitors.

Attributes:

Name Type Description
config ServiceEndpointHealthMonitorConfig

health monitor configuration for endpoint

Source code in zenml/services/service_monitor.py
class BaseServiceEndpointHealthMonitor(BaseTypedModel):
    """Base class used for service endpoint health monitors.

    Attributes:
        config: health monitor configuration for endpoint
    """

    config: ServiceEndpointHealthMonitorConfig = Field(
        default_factory=ServiceEndpointHealthMonitorConfig
    )

    @abstractmethod
    def check_endpoint_status(
        self, endpoint: "BaseServiceEndpoint"
    ) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the external
        service endpoint.

        This method should be overridden by subclasses that implement
        concrete service endpoint tracking functionality.

        Returns:
            The operational state of the external service endpoint and an
            optional error message, if an error is encountered while checking
            the service endpoint status.
        """
check_endpoint_status(self, endpoint)

Check the the current operational state of the external service endpoint.

This method should be overridden by subclasses that implement concrete service endpoint tracking functionality.

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the external service endpoint and an optional error message, if an error is encountered while checking the service endpoint status.

Source code in zenml/services/service_monitor.py
@abstractmethod
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the external
    service endpoint.

    This method should be overridden by subclasses that implement
    concrete service endpoint tracking functionality.

    Returns:
        The operational state of the external service endpoint and an
        optional error message, if an error is encountered while checking
        the service endpoint status.
    """

HTTPEndpointHealthMonitor (BaseServiceEndpointHealthMonitor) pydantic-model

HTTP service endpoint health monitor.

Attributes:

Name Type Description
config HTTPEndpointHealthMonitorConfig

health monitor configuration for HTTP endpoint

Source code in zenml/services/service_monitor.py
class HTTPEndpointHealthMonitor(BaseServiceEndpointHealthMonitor):
    """HTTP service endpoint health monitor.

    Attributes:
        config: health monitor configuration for HTTP endpoint
    """

    config: HTTPEndpointHealthMonitorConfig = Field(
        default_factory=HTTPEndpointHealthMonitorConfig
    )

    def get_healthcheck_uri(
        self, endpoint: "BaseServiceEndpoint"
    ) -> Optional[str]:
        uri = endpoint.status.uri
        if not uri:
            return None
        return f"{uri}{self.config.healthcheck_uri_path}"

    def check_endpoint_status(
        self, endpoint: "BaseServiceEndpoint"
    ) -> Tuple[ServiceState, str]:
        """Run a HTTP endpoint API healthcheck

        Returns:
            The operational state of the external HTTP endpoint and an
            optional message describing that state (e.g. an error message,
            if an error is encountered while checking the HTTP endpoint
            status).
        """
        from zenml.services.service_endpoint import ServiceEndpointProtocol

        if endpoint.status.protocol not in [
            ServiceEndpointProtocol.HTTP,
            ServiceEndpointProtocol.HTTPS,
        ]:
            return ServiceState.ERROR, "endpoint protocol is not HTTP nor HTTPS"

        check_uri = self.get_healthcheck_uri(endpoint)
        if not check_uri:
            return ServiceState.ERROR, "no HTTP healthcheck URI available"

        logger.debug("Running HTTP healthcheck for URI: %s", check_uri)

        try:
            if self.config.use_head_request:
                r = requests.head(
                    check_uri,
                    timeout=self.config.http_timeout,
                )
            else:
                r = requests.get(
                    check_uri,
                    timeout=self.config.http_timeout,
                )
            if r.status_code == self.config.http_status_code:
                # the endpoint is healthy
                return ServiceState.ACTIVE, ""
            error = f"HTTP endpoint healthcheck returned unexpected status code: {r.status_code}"
        except requests.ConnectionError as e:
            error = f"HTTP endpoint healthcheck connection error: {str(e)}"
        except requests.Timeout as e:
            error = f"HTTP endpoint healthcheck request timed out: {str(e)}"
        except requests.RequestException as e:
            error = (
                f"unexpected error encountered while running HTTP endpoint "
                f"healthcheck: {str(e)}"
            )

        return ServiceState.ERROR, error
check_endpoint_status(self, endpoint)

Run a HTTP endpoint API healthcheck

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the external HTTP endpoint and an optional message describing that state (e.g. an error message, if an error is encountered while checking the HTTP endpoint status).

Source code in zenml/services/service_monitor.py
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Run a HTTP endpoint API healthcheck

    Returns:
        The operational state of the external HTTP endpoint and an
        optional message describing that state (e.g. an error message,
        if an error is encountered while checking the HTTP endpoint
        status).
    """
    from zenml.services.service_endpoint import ServiceEndpointProtocol

    if endpoint.status.protocol not in [
        ServiceEndpointProtocol.HTTP,
        ServiceEndpointProtocol.HTTPS,
    ]:
        return ServiceState.ERROR, "endpoint protocol is not HTTP nor HTTPS"

    check_uri = self.get_healthcheck_uri(endpoint)
    if not check_uri:
        return ServiceState.ERROR, "no HTTP healthcheck URI available"

    logger.debug("Running HTTP healthcheck for URI: %s", check_uri)

    try:
        if self.config.use_head_request:
            r = requests.head(
                check_uri,
                timeout=self.config.http_timeout,
            )
        else:
            r = requests.get(
                check_uri,
                timeout=self.config.http_timeout,
            )
        if r.status_code == self.config.http_status_code:
            # the endpoint is healthy
            return ServiceState.ACTIVE, ""
        error = f"HTTP endpoint healthcheck returned unexpected status code: {r.status_code}"
    except requests.ConnectionError as e:
        error = f"HTTP endpoint healthcheck connection error: {str(e)}"
    except requests.Timeout as e:
        error = f"HTTP endpoint healthcheck request timed out: {str(e)}"
    except requests.RequestException as e:
        error = (
            f"unexpected error encountered while running HTTP endpoint "
            f"healthcheck: {str(e)}"
        )

    return ServiceState.ERROR, error

HTTPEndpointHealthMonitorConfig (ServiceEndpointHealthMonitorConfig) pydantic-model

HTTP service endpoint health monitor configuration.

Attributes:

Name Type Description
healthcheck_uri_path str

URI subpath to use to perform service endpoint healthchecks. If not set, the service endpoint URI will be used instead.

use_head_request bool

set to True to use a HEAD request instead of a GET when calling the healthcheck URI.

http_status_code int

HTTP status code to expect in the health check response.

http_timeout int

HTTP health check request timeout in seconds.

Source code in zenml/services/service_monitor.py
class HTTPEndpointHealthMonitorConfig(ServiceEndpointHealthMonitorConfig):
    """HTTP service endpoint health monitor configuration.

    Attributes:
        healthcheck_uri_path: URI subpath to use to perform service endpoint
            healthchecks. If not set, the service endpoint URI will be used
            instead.
        use_head_request: set to True to use a HEAD request instead of a GET
            when calling the healthcheck URI.
        http_status_code: HTTP status code to expect in the health check
            response.
        http_timeout: HTTP health check request timeout in seconds.
    """

    healthcheck_uri_path: str = ""
    use_head_request: bool = False
    http_status_code: int = 200
    http_timeout: int = DEFAULT_HTTP_HEALTHCHECK_TIMEOUT

ServiceEndpointHealthMonitorConfig (BaseTypedModel) pydantic-model

Generic service health monitor configuration.

Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the health monitor configuration.

Source code in zenml/services/service_monitor.py
class ServiceEndpointHealthMonitorConfig(BaseTypedModel):
    """Generic service health monitor configuration.

    Concrete service classes should extend this class and add additional
    attributes that they want to see reflected and use in the health monitor
    configuration.
    """

TCPEndpointHealthMonitor (BaseServiceEndpointHealthMonitor) pydantic-model

TCP service endpoint health monitor.

Attributes:

Name Type Description
config TCPEndpointHealthMonitorConfig

health monitor configuration for TCP endpoint

Source code in zenml/services/service_monitor.py
class TCPEndpointHealthMonitor(BaseServiceEndpointHealthMonitor):
    """TCP service endpoint health monitor.

    Attributes:
        config: health monitor configuration for TCP endpoint
    """

    config: TCPEndpointHealthMonitorConfig

    def check_endpoint_status(
        self, endpoint: "BaseServiceEndpoint"
    ) -> Tuple[ServiceState, str]:
        """Run a TCP endpoint healthcheck

        Returns:
            The operational state of the external TCP endpoint and an
            optional message describing that state (e.g. an error message,
            if an error is encountered while checking the TCP endpoint
            status).
        """
        if not endpoint.status.port or not endpoint.status.hostname:
            return (
                ServiceState.ERROR,
                "TCP port and hostname values are not known",
            )

        logger.debug(
            "Running TCP healthcheck for TCP port: %d", endpoint.status.port
        )

        if port_is_open(endpoint.status.hostname, endpoint.status.port):
            # the endpoint is healthy
            return ServiceState.ACTIVE, ""

        return (
            ServiceState.ERROR,
            "TCP endpoint healthcheck error: TCP port is not "
            "open or not accessible",
        )
check_endpoint_status(self, endpoint)

Run a TCP endpoint healthcheck

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the external TCP endpoint and an optional message describing that state (e.g. an error message, if an error is encountered while checking the TCP endpoint status).

Source code in zenml/services/service_monitor.py
def check_endpoint_status(
    self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
    """Run a TCP endpoint healthcheck

    Returns:
        The operational state of the external TCP endpoint and an
        optional message describing that state (e.g. an error message,
        if an error is encountered while checking the TCP endpoint
        status).
    """
    if not endpoint.status.port or not endpoint.status.hostname:
        return (
            ServiceState.ERROR,
            "TCP port and hostname values are not known",
        )

    logger.debug(
        "Running TCP healthcheck for TCP port: %d", endpoint.status.port
    )

    if port_is_open(endpoint.status.hostname, endpoint.status.port):
        # the endpoint is healthy
        return ServiceState.ACTIVE, ""

    return (
        ServiceState.ERROR,
        "TCP endpoint healthcheck error: TCP port is not "
        "open or not accessible",
    )

TCPEndpointHealthMonitorConfig (ServiceEndpointHealthMonitorConfig) pydantic-model

TCP service endpoint health monitor configuration.

Source code in zenml/services/service_monitor.py
class TCPEndpointHealthMonitorConfig(ServiceEndpointHealthMonitorConfig):
    """TCP service endpoint health monitor configuration."""

service_registry

ServiceRegistry

Registry of service types and service instances.

The service registry provides a central place to register service types as well as service instances.

Source code in zenml/services/service_registry.py
class ServiceRegistry(metaclass=SingletonMetaClass):
    """Registry of service types and service instances.

    The service registry provides a central place to register service types
    as well as service instances.
    """

    def __init__(self) -> None:
        self.service_types: Dict[ServiceType, Type["BaseService"]] = {}
        self.services: Dict[UUID, "BaseService"] = {}

    def register_service_type(self, cls: Type["BaseService"]) -> None:
        """Registers a new service type.

        Args:
            cls: a BaseService subclass.
        Raises:
            TypeError: if the service type is already registered.
        """
        service_type = cls.SERVICE_TYPE
        if service_type not in self.service_types:
            self.service_types[service_type] = cls
            logger.debug(
                f"Registered service class {cls} for "
                f"service type `{service_type}`"
            )
        else:
            raise TypeError(
                f"Found existing service type for {service_type}: "
                f"{self.service_types[service_type]}. Skipping registration "
                f"of {cls}."
            )

    def get_service_type(
        self, service_type: ServiceType
    ) -> Optional[Type["BaseService"]]:
        """Get the service class registered for a service type.

        Args:
            service_type: service type.

        Returns:
            `BaseService` subclass that was registered for the service type or
            None, if no service class was registered for the service type.
        """
        return self.service_types.get(service_type)

    def get_service_types(
        self,
    ) -> Dict[ServiceType, Type["BaseService"]]:
        """Get all registered service types."""
        return self.service_types.copy()

    def service_type_is_registered(self, service_type: ServiceType) -> bool:
        """Check if a service type is registered."""
        return service_type in self.service_types

    def register_service(self, service: "BaseService") -> None:
        """Registers a new service instance

        Args:
            service: a BaseService instance.
        Raises:
            TypeError: if the service instance has a service type that is not
                registered.
        """
        service_type = service.SERVICE_TYPE
        if service_type not in self.service_types:
            raise TypeError(f"Service type `{service_type}` is not registered.")

        if service.uuid not in self.services:
            self.services[service.uuid] = service
            logger.debug(f"Registered service {service}")
        else:
            existing_service = self.services[service.uuid]
            raise Exception(
                f"Found existing service {existing_service} for UUID: "
                f"{service.uuid}. Skipping registration for service "
                f"{service}."
            )

    def get_service(self, uuid: UUID) -> Optional["BaseService"]:
        """Get the service instance registered for a UUID.

        Args:
            UUID: service instance identifier.

        Returns:
            `BaseService` instance that was registered for the UUID or
            None, if no matching service instance was found.
        """
        return self.services.get(uuid)

    def get_services(self) -> Dict[UUID, "BaseService"]:
        """Get all service instances currently registered.

        Returns:
            Dictionary of `BaseService` instances indexed by their UUID with
            all services that are currently registered.
        """
        return self.services.copy()

    def service_is_registered(self, uuid: UUID) -> bool:
        """Check if a service instance is registered."""
        return uuid in self.services

    def load_service_from_dict(
        self, service_dict: Dict[str, Any]
    ) -> "BaseService":
        """Load a service instance from its dict representation.

        Creates, registers and returns a service instantiated from the dict
        representation of the service configuration and last known status
        information.

        If an existing service instance with the same UUID is already
        present in the service registry, it is returned instead.

        Args:
            service_dict: dict representation of the service configuration and
                last known status

        Returns:
            A new or existing ZenML service instance.
        """
        service_type = service_dict.get("service_type")
        if not service_type:
            raise ValueError(
                "Service type not present in the service dictionary"
            )
        service_type = ServiceType.parse_obj(service_type)
        service_class = self.get_service_type(service_type)
        if not service_class:
            raise TypeError(
                f"Cannot load service with unregistered service "
                f"type: {service_type}"
            )
        service = cast("BaseService", service_class.from_dict(service_dict))
        return service

    def load_service_from_json(self, json_str: str) -> "BaseService":
        """Load a service instance from its JSON representation.

        Creates and returns a service instantiated from the JSON serialized
        service configuration and last known status information.

        Args:
            json_str: JSON string representation of the service configuration
                and last known status

        Returns:
            A ZenML service instance.
        """
        service_dict = json.loads(json_str)
        return self.load_service_from_dict(service_dict)
get_service(self, uuid)

Get the service instance registered for a UUID.

Parameters:

Name Type Description Default
UUID

service instance identifier.

required

Returns:

Type Description
Optional[BaseService]

BaseService instance that was registered for the UUID or None, if no matching service instance was found.

Source code in zenml/services/service_registry.py
def get_service(self, uuid: UUID) -> Optional["BaseService"]:
    """Get the service instance registered for a UUID.

    Args:
        UUID: service instance identifier.

    Returns:
        `BaseService` instance that was registered for the UUID or
        None, if no matching service instance was found.
    """
    return self.services.get(uuid)
get_service_type(self, service_type)

Get the service class registered for a service type.

Parameters:

Name Type Description Default
service_type ServiceType

service type.

required

Returns:

Type Description
Optional[Type[BaseService]]

BaseService subclass that was registered for the service type or None, if no service class was registered for the service type.

Source code in zenml/services/service_registry.py
def get_service_type(
    self, service_type: ServiceType
) -> Optional[Type["BaseService"]]:
    """Get the service class registered for a service type.

    Args:
        service_type: service type.

    Returns:
        `BaseService` subclass that was registered for the service type or
        None, if no service class was registered for the service type.
    """
    return self.service_types.get(service_type)
get_service_types(self)

Get all registered service types.

Source code in zenml/services/service_registry.py
def get_service_types(
    self,
) -> Dict[ServiceType, Type["BaseService"]]:
    """Get all registered service types."""
    return self.service_types.copy()
get_services(self)

Get all service instances currently registered.

Returns:

Type Description
Dict[uuid.UUID, BaseService]

Dictionary of BaseService instances indexed by their UUID with all services that are currently registered.

Source code in zenml/services/service_registry.py
def get_services(self) -> Dict[UUID, "BaseService"]:
    """Get all service instances currently registered.

    Returns:
        Dictionary of `BaseService` instances indexed by their UUID with
        all services that are currently registered.
    """
    return self.services.copy()
load_service_from_dict(self, service_dict)

Load a service instance from its dict representation.

Creates, registers and returns a service instantiated from the dict representation of the service configuration and last known status information.

If an existing service instance with the same UUID is already present in the service registry, it is returned instead.

Parameters:

Name Type Description Default
service_dict Dict[str, Any]

dict representation of the service configuration and last known status

required

Returns:

Type Description
BaseService

A new or existing ZenML service instance.

Source code in zenml/services/service_registry.py
def load_service_from_dict(
    self, service_dict: Dict[str, Any]
) -> "BaseService":
    """Load a service instance from its dict representation.

    Creates, registers and returns a service instantiated from the dict
    representation of the service configuration and last known status
    information.

    If an existing service instance with the same UUID is already
    present in the service registry, it is returned instead.

    Args:
        service_dict: dict representation of the service configuration and
            last known status

    Returns:
        A new or existing ZenML service instance.
    """
    service_type = service_dict.get("service_type")
    if not service_type:
        raise ValueError(
            "Service type not present in the service dictionary"
        )
    service_type = ServiceType.parse_obj(service_type)
    service_class = self.get_service_type(service_type)
    if not service_class:
        raise TypeError(
            f"Cannot load service with unregistered service "
            f"type: {service_type}"
        )
    service = cast("BaseService", service_class.from_dict(service_dict))
    return service
load_service_from_json(self, json_str)

Load a service instance from its JSON representation.

Creates and returns a service instantiated from the JSON serialized service configuration and last known status information.

Parameters:

Name Type Description Default
json_str str

JSON string representation of the service configuration and last known status

required

Returns:

Type Description
BaseService

A ZenML service instance.

Source code in zenml/services/service_registry.py
def load_service_from_json(self, json_str: str) -> "BaseService":
    """Load a service instance from its JSON representation.

    Creates and returns a service instantiated from the JSON serialized
    service configuration and last known status information.

    Args:
        json_str: JSON string representation of the service configuration
            and last known status

    Returns:
        A ZenML service instance.
    """
    service_dict = json.loads(json_str)
    return self.load_service_from_dict(service_dict)
register_service(self, service)

Registers a new service instance

Parameters:

Name Type Description Default
service BaseService

a BaseService instance.

required

Exceptions:

Type Description
TypeError

if the service instance has a service type that is not registered.

Source code in zenml/services/service_registry.py
def register_service(self, service: "BaseService") -> None:
    """Registers a new service instance

    Args:
        service: a BaseService instance.
    Raises:
        TypeError: if the service instance has a service type that is not
            registered.
    """
    service_type = service.SERVICE_TYPE
    if service_type not in self.service_types:
        raise TypeError(f"Service type `{service_type}` is not registered.")

    if service.uuid not in self.services:
        self.services[service.uuid] = service
        logger.debug(f"Registered service {service}")
    else:
        existing_service = self.services[service.uuid]
        raise Exception(
            f"Found existing service {existing_service} for UUID: "
            f"{service.uuid}. Skipping registration for service "
            f"{service}."
        )
register_service_type(self, cls)

Registers a new service type.

Parameters:

Name Type Description Default
cls Type[BaseService]

a BaseService subclass.

required

Exceptions:

Type Description
TypeError

if the service type is already registered.

Source code in zenml/services/service_registry.py
def register_service_type(self, cls: Type["BaseService"]) -> None:
    """Registers a new service type.

    Args:
        cls: a BaseService subclass.
    Raises:
        TypeError: if the service type is already registered.
    """
    service_type = cls.SERVICE_TYPE
    if service_type not in self.service_types:
        self.service_types[service_type] = cls
        logger.debug(
            f"Registered service class {cls} for "
            f"service type `{service_type}`"
        )
    else:
        raise TypeError(
            f"Found existing service type for {service_type}: "
            f"{self.service_types[service_type]}. Skipping registration "
            f"of {cls}."
        )
service_is_registered(self, uuid)

Check if a service instance is registered.

Source code in zenml/services/service_registry.py
def service_is_registered(self, uuid: UUID) -> bool:
    """Check if a service instance is registered."""
    return uuid in self.services
service_type_is_registered(self, service_type)

Check if a service type is registered.

Source code in zenml/services/service_registry.py
def service_type_is_registered(self, service_type: ServiceType) -> bool:
    """Check if a service type is registered."""
    return service_type in self.service_types

service_status

ServiceState (StrEnum)

Possible states for the service and service endpoint.

Source code in zenml/services/service_status.py
class ServiceState(StrEnum):
    """Possible states for the service and service endpoint."""

    ACTIVE = "active"
    PENDING_STARTUP = "pending_startup"
    INACTIVE = "inactive"
    PENDING_SHUTDOWN = "pending_shutdown"
    ERROR = "error"

ServiceStatus (BaseTypedModel) pydantic-model

Information describing the operational status of an external process or service tracked by ZenML (e.g. process, container, Kubernetes deployment etc.).

Concrete service classes should extend this class and add additional attributes that make up the operational state of the service.

Attributes:

Name Type Description
state ServiceState

the current operational state

last_state ServiceState

the operational state prior to the last status update

last_error str

the error encountered during the last status update

Source code in zenml/services/service_status.py
class ServiceStatus(BaseTypedModel):
    """Information describing the operational status of an external process
    or service tracked by ZenML (e.g. process, container, Kubernetes
    deployment etc.).

    Concrete service classes should extend this class and add additional
    attributes that make up the operational state of the service.

    Attributes:
        state: the current operational state
        last_state: the operational state prior to the last status update
        last_error: the error encountered during the last status update
    """

    state: ServiceState = ServiceState.INACTIVE
    last_state: ServiceState = ServiceState.INACTIVE
    last_error: str = ""

    def update_state(
        self,
        new_state: Optional[ServiceState] = None,
        error: str = "",
    ) -> None:
        """Update the current operational state to reflect a new state
        value and/or error.

        Args:
            new_state: new operational state discovered by the last service
                status update
            error: error message describing an operational failure encountered
                during the last service status update
        """
        if new_state and self.state != new_state:
            self.last_state = self.state
            self.state = new_state
        if error:
            self.last_error = error

    def clear_error(self) -> None:
        """Clear the last error message."""
        self.last_error = ""
clear_error(self)

Clear the last error message.

Source code in zenml/services/service_status.py
def clear_error(self) -> None:
    """Clear the last error message."""
    self.last_error = ""
update_state(self, new_state=None, error='')

Update the current operational state to reflect a new state value and/or error.

Parameters:

Name Type Description Default
new_state Optional[zenml.services.service_status.ServiceState]

new operational state discovered by the last service status update

None
error str

error message describing an operational failure encountered during the last service status update

''
Source code in zenml/services/service_status.py
def update_state(
    self,
    new_state: Optional[ServiceState] = None,
    error: str = "",
) -> None:
    """Update the current operational state to reflect a new state
    value and/or error.

    Args:
        new_state: new operational state discovered by the last service
            status update
        error: error message describing an operational failure encountered
            during the last service status update
    """
    if new_state and self.state != new_state:
        self.last_state = self.state
        self.state = new_state
    if error:
        self.last_error = error

service_type

ServiceType (BaseModel) pydantic-model

Service type descriptor.

Attributes:

Name Type Description
type str

service type

flavor str

service flavor

name str

name of the service type

description str

description of the service type

Source code in zenml/services/service_type.py
class ServiceType(BaseModel):
    """Service type descriptor.

    Attributes:
        type: service type
        flavor: service flavor
        name: name of the service type
        description: description of the service type
    """

    type: str
    flavor: str
    name: str = ""
    description: str = ""

    class Config:
        """Pydantic configuration class."""

        # make the service type immutable and hashable
        frozen = True
Config

Pydantic configuration class.

Source code in zenml/services/service_type.py
class Config:
    """Pydantic configuration class."""

    # make the service type immutable and hashable
    frozen = True

utils

load_last_service_from_step(pipeline_name, step_name, step_context=None, running=False)

Get the last service created by the pipeline and step with the given names.

This function searches backwards through the execution history for a named pipeline step and returns the first service instance that it finds logged as a step output.

Parameters:

Name Type Description Default
pipeline_name str

the name of the pipeline

required
step_name str

pipeline step name

required
step_context Optional[zenml.steps.step_context.StepContext]

step context required only when called from within a step

None
running bool

when this flag is set, the search only returns a running service

False

Returns:

Type Description
Optional[zenml.services.service.BaseService]

A BaseService instance that represents the service or None if no service was created during the last execution of the pipeline step.

Exceptions:

Type Description
KeyError

if the pipeline or step name is not found in the execution.

Source code in zenml/services/utils.py
def load_last_service_from_step(
    pipeline_name: str,
    step_name: str,
    step_context: Optional[StepContext] = None,
    running: bool = False,
) -> Optional[BaseService]:
    """Get the last service created by the pipeline and step with the given
    names.

    This function searches backwards through the execution history for a
    named pipeline step and returns the first service instance that it finds
    logged as a step output.

    Args:
        pipeline_name: the name of the pipeline
        step_name: pipeline step name
        step_context: step context required only when called from within a step
        running: when this flag is set, the search only returns a running
            service

    Returns:
        A BaseService instance that represents the service or None if no service
        was created during the last execution of the pipeline step.

    Raises:
        KeyError: if the pipeline or step name is not found in the execution.
    """
    if step_context is None:
        repo = Repository()
        pipeline = repo.get_pipeline(pipeline_name)
    else:
        pipeline = step_context.metadata_store.get_pipeline(
            pipeline_name=pipeline_name
        )
    if pipeline is None:
        raise KeyError(f"No pipeline with name `{pipeline_name}` was found")

    for run in reversed(pipeline.runs):
        step = run.get_step(name=step_name)
        for artifact_view in step.outputs.values():
            # filter out anything but service artifacts
            if artifact_view.type == "ServiceArtifact":
                service = artifact_view.read()
                if not isinstance(service, BaseService):
                    raise RuntimeError(
                        f"Artifact `{artifact_view.name}` of type "
                        f"`{artifact_view.type}` is not a service"
                    )
                if not running or service.is_running:
                    return service
    return None