Services
zenml.services
special
Initialization of the ZenML services module.
A service is a process or set of processes that outlive a pipeline run.
local
special
Initialization of a local ZenML service.
local_daemon_entrypoint
Implementation of a local daemon entrypoint.
This executable file is utilized as an entrypoint for all ZenML services that are implemented as locally running daemon processes.
local_service
Implementation of a local ZenML service.
LocalDaemonService (BaseService)
pydantic-model
A service represented by a local daemon process.
This class extends the base service class with functionality concerning the life-cycle management and tracking of external services implemented as local daemon processes.
To define a local daemon service, subclass this class and implement the
run
method. Upon start
, the service will spawn a daemon process that
ends up calling the run
method.
Examples:
from zenml.services import ServiceType, LocalDaemonService, LocalDaemonServiceConfig
import time
class SleepingDaemonConfig(LocalDaemonServiceConfig):
wake_up_after: int
class SleepingDaemon(LocalDaemonService):
SERVICE_TYPE = ServiceType(
name="sleeper",
description="Sleeping daemon",
type="daemon",
flavor="sleeping",
)
config: SleepingDaemonConfig
def run(self) -> None:
time.sleep(self.config.wake_up_after)
daemon = SleepingDaemon(config=SleepingDaemonConfig(wake_up_after=10))
daemon.start()
NOTE: the SleepingDaemon
class and its parent module have to be
discoverable as part of a ZenML Integration
, otherwise the daemon will
fail with the following error:
TypeError: Cannot load service with unregistered service type:
name='sleeper' type='daemon' flavor='sleeping' description='Sleeping daemon'
Attributes:
Name | Type | Description |
---|---|---|
config |
LocalDaemonServiceConfig |
service configuration |
status |
LocalDaemonServiceStatus |
service status |
endpoint |
Optional[zenml.services.local.local_service_endpoint.LocalDaemonServiceEndpoint] |
optional service endpoint |
Source code in zenml/services/local/local_service.py
class LocalDaemonService(BaseService):
"""A service represented by a local daemon process.
This class extends the base service class with functionality concerning
the life-cycle management and tracking of external services implemented as
local daemon processes.
To define a local daemon service, subclass this class and implement the
`run` method. Upon `start`, the service will spawn a daemon process that
ends up calling the `run` method.
Example:
```python
from zenml.services import ServiceType, LocalDaemonService, LocalDaemonServiceConfig
import time
class SleepingDaemonConfig(LocalDaemonServiceConfig):
wake_up_after: int
class SleepingDaemon(LocalDaemonService):
SERVICE_TYPE = ServiceType(
name="sleeper",
description="Sleeping daemon",
type="daemon",
flavor="sleeping",
)
config: SleepingDaemonConfig
def run(self) -> None:
time.sleep(self.config.wake_up_after)
daemon = SleepingDaemon(config=SleepingDaemonConfig(wake_up_after=10))
daemon.start()
```
NOTE: the `SleepingDaemon` class and its parent module have to be
discoverable as part of a ZenML `Integration`, otherwise the daemon will
fail with the following error:
```
TypeError: Cannot load service with unregistered service type:
name='sleeper' type='daemon' flavor='sleeping' description='Sleeping daemon'
```
Attributes:
config: service configuration
status: service status
endpoint: optional service endpoint
"""
config: LocalDaemonServiceConfig = Field(
default_factory=LocalDaemonServiceConfig
)
status: LocalDaemonServiceStatus = Field(
default_factory=LocalDaemonServiceStatus
)
# TODO [ENG-705]: allow multiple endpoints per service
endpoint: Optional[LocalDaemonServiceEndpoint] = None
def get_service_status_message(self) -> str:
"""Get a message about the current operational state of the service.
Returns:
A message providing information about the current operational
state of the service.
"""
msg = super().get_service_status_message()
pid = self.status.pid
if pid:
msg += f" Daemon PID: `{self.status.pid}`\n"
if self.status.log_file:
msg += (
f"For more information on the service status, please see the "
f"following log file: {self.status.log_file}\n"
)
return msg
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the daemon process.
Returns:
The operational state of the daemon process and a message
providing additional information about that state (e.g. a
description of the error, if one is encountered).
"""
if not self.status.pid:
return ServiceState.INACTIVE, "service daemon is not running"
# the daemon is running
return ServiceState.ACTIVE, ""
def _get_daemon_cmd(self) -> Tuple[List[str], Dict[str, str]]:
"""Get the command to run the service daemon.
The default implementation provided by this class is the following:
* this LocalDaemonService instance and its configuration
are serialized as JSON and saved to a temporary file
* the local_daemon_entrypoint.py script is launched as a subprocess
and pointed to the serialized service file
* the entrypoint script re-creates the LocalDaemonService instance
from the serialized configuration, reconfigures itself as a daemon
and detaches itself from the parent process, then calls the `run`
method that must be implemented by the subclass
Subclasses that need a different command to launch the service daemon
should override this method.
Returns:
Command needed to launch the daemon process and the environment
variables to set, in the formats accepted by subprocess.Popen.
"""
# to avoid circular imports, import here
import zenml.services.local.local_daemon_entrypoint as daemon_entrypoint
self.status.silent_daemon = self.config.silent_daemon
# reuse the config file and logfile location from a previous run,
# if available
if not self.status.runtime_path or not os.path.exists(
self.status.runtime_path
):
if self.config.root_runtime_path:
if self.config.singleton:
self.status.runtime_path = self.config.root_runtime_path
else:
self.status.runtime_path = os.path.join(
self.config.root_runtime_path,
str(self.uuid),
)
create_dir_recursive_if_not_exists(self.status.runtime_path)
else:
self.status.runtime_path = tempfile.mkdtemp(
prefix="zenml-service-"
)
assert self.status.config_file is not None
assert self.status.pid_file is not None
with open(self.status.config_file, "w") as f:
f.write(self.json(indent=4))
# delete the previous PID file, in case a previous daemon process
# crashed and left a stale PID file
if os.path.exists(self.status.pid_file):
os.remove(self.status.pid_file)
command = [
sys.executable,
"-m",
daemon_entrypoint.__name__,
"--config-file",
self.status.config_file,
"--pid-file",
self.status.pid_file,
]
if self.status.log_file:
pathlib.Path(self.status.log_file).touch()
command += ["--log-file", self.status.log_file]
command_env = os.environ.copy()
return command, command_env
def _start_daemon(self) -> None:
"""Start the service daemon process associated with this service."""
pid = self.status.pid
if pid:
# service daemon is already running
logger.debug(
"Daemon process for service '%s' is already running with PID %d",
self,
pid,
)
return
logger.debug("Starting daemon for service '%s'...", self)
if self.endpoint:
self.endpoint.prepare_for_start()
command, command_env = self._get_daemon_cmd()
logger.debug(
"Running command to start daemon for service '%s': %s",
self,
" ".join(command),
)
p = subprocess.Popen(command, env=command_env)
p.wait()
pid = self.status.pid
if pid:
logger.debug(
"Daemon process for service '%s' started with PID: %d",
self,
pid,
)
else:
logger.error(
"Daemon process for service '%s' failed to start.",
self,
)
def _stop_daemon(self, force: bool = False) -> None:
"""Stop the service daemon process associated with this service.
Args:
force: if True, the service daemon will be forcefully stopped
"""
pid = self.status.pid
if not pid:
# service daemon is not running
logger.debug(
"Daemon process for service '%s' no longer running",
self,
)
return
logger.debug("Stopping daemon for service '%s' ...", self)
try:
p = psutil.Process(pid)
except psutil.Error:
logger.error(
"Could not find process for for service '%s' ...", self
)
return
if force:
p.kill()
else:
p.terminate()
def provision(self) -> None:
"""Provision the service."""
self._start_daemon()
def deprovision(self, force: bool = False) -> None:
"""Deprovision the service.
Args:
force: if True, the service daemon will be forcefully stopped
"""
self._stop_daemon(force)
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the service logs.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Yields:
A generator that can be accessed to get the service logs.
"""
if not self.status.log_file or not os.path.exists(self.status.log_file):
return
with open(self.status.log_file, "r") as f:
if tail:
# TODO[ENG-864]: implement a more efficient tailing mechanism that
# doesn't read the entire file
lines = f.readlines()[-tail:]
for line in lines:
yield line.rstrip("\n")
if not follow:
return
line = ""
while True:
partial_line = f.readline()
if partial_line:
line += partial_line
if line.endswith("\n"):
stop = yield line.rstrip("\n")
if stop:
break
line = ""
elif follow:
time.sleep(1)
else:
break
@abstractmethod
def run(self) -> None:
"""Run the service daemon process associated with this service.
Subclasses must implement this method to provide the service daemon
functionality. This method will be executed in the context of the
running daemon, not in the context of the process that calls the
`start` method.
"""
check_status(self)
Check the the current operational state of the daemon process.
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the daemon process and a message providing additional information about that state (e.g. a description of the error, if one is encountered). |
Source code in zenml/services/local/local_service.py
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the daemon process.
Returns:
The operational state of the daemon process and a message
providing additional information about that state (e.g. a
description of the error, if one is encountered).
"""
if not self.status.pid:
return ServiceState.INACTIVE, "service daemon is not running"
# the daemon is running
return ServiceState.ACTIVE, ""
deprovision(self, force=False)
Deprovision the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
if True, the service daemon will be forcefully stopped |
False |
Source code in zenml/services/local/local_service.py
def deprovision(self, force: bool = False) -> None:
"""Deprovision the service.
Args:
force: if True, the service daemon will be forcefully stopped
"""
self._stop_daemon(force)
get_logs(self, follow=False, tail=None)
Retrieve the service logs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Yields:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Source code in zenml/services/local/local_service.py
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the service logs.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Yields:
A generator that can be accessed to get the service logs.
"""
if not self.status.log_file or not os.path.exists(self.status.log_file):
return
with open(self.status.log_file, "r") as f:
if tail:
# TODO[ENG-864]: implement a more efficient tailing mechanism that
# doesn't read the entire file
lines = f.readlines()[-tail:]
for line in lines:
yield line.rstrip("\n")
if not follow:
return
line = ""
while True:
partial_line = f.readline()
if partial_line:
line += partial_line
if line.endswith("\n"):
stop = yield line.rstrip("\n")
if stop:
break
line = ""
elif follow:
time.sleep(1)
else:
break
get_service_status_message(self)
Get a message about the current operational state of the service.
Returns:
Type | Description |
---|---|
str |
A message providing information about the current operational state of the service. |
Source code in zenml/services/local/local_service.py
def get_service_status_message(self) -> str:
"""Get a message about the current operational state of the service.
Returns:
A message providing information about the current operational
state of the service.
"""
msg = super().get_service_status_message()
pid = self.status.pid
if pid:
msg += f" Daemon PID: `{self.status.pid}`\n"
if self.status.log_file:
msg += (
f"For more information on the service status, please see the "
f"following log file: {self.status.log_file}\n"
)
return msg
provision(self)
Provision the service.
Source code in zenml/services/local/local_service.py
def provision(self) -> None:
"""Provision the service."""
self._start_daemon()
run(self)
Run the service daemon process associated with this service.
Subclasses must implement this method to provide the service daemon
functionality. This method will be executed in the context of the
running daemon, not in the context of the process that calls the
start
method.
Source code in zenml/services/local/local_service.py
@abstractmethod
def run(self) -> None:
"""Run the service daemon process associated with this service.
Subclasses must implement this method to provide the service daemon
functionality. This method will be executed in the context of the
running daemon, not in the context of the process that calls the
`start` method.
"""
LocalDaemonServiceConfig (ServiceConfig)
pydantic-model
Local daemon service configuration.
Attributes:
Name | Type | Description |
---|---|---|
silent_daemon |
bool |
set to True to suppress the output of the daemon (i.e. redirect stdout and stderr to /dev/null). If False, the daemon output will be redirected to a logfile. |
root_runtime_path |
Optional[str] |
the root path where the service daemon will store service configuration files |
singleton |
bool |
set to True to store the service daemon configuration files
directly in the |
Source code in zenml/services/local/local_service.py
class LocalDaemonServiceConfig(ServiceConfig):
"""Local daemon service configuration.
Attributes:
silent_daemon: set to True to suppress the output of the daemon
(i.e. redirect stdout and stderr to /dev/null). If False, the
daemon output will be redirected to a logfile.
root_runtime_path: the root path where the service daemon will store
service configuration files
singleton: set to True to store the service daemon configuration files
directly in the `root_runtime_path` directory instead of creating
a subdirectory for each service instance. Only has effect if the
`root_runtime_path` is also set.
"""
silent_daemon: bool = False
root_runtime_path: Optional[str] = None
singleton: bool = False
LocalDaemonServiceStatus (ServiceStatus)
pydantic-model
Local daemon service status.
Attributes:
Name | Type | Description |
---|---|---|
runtime_path |
Optional[str] |
the path where the service daemon runtime files (the configuration file used to start the service daemon and the logfile) are located |
silent_daemon |
bool |
flag indicating whether the output of the daemon is suppressed (redirected to /dev/null). |
Source code in zenml/services/local/local_service.py
class LocalDaemonServiceStatus(ServiceStatus):
"""Local daemon service status.
Attributes:
runtime_path: the path where the service daemon runtime files (the
configuration file used to start the service daemon and the
logfile) are located
silent_daemon: flag indicating whether the output of the daemon
is suppressed (redirected to /dev/null).
"""
runtime_path: Optional[str] = None
# TODO [ENG-704]: remove field duplication between XServiceStatus and
# XServiceConfig (e.g. keep a private reference to the config in the
# status)
silent_daemon: bool = False
@property
def config_file(self) -> Optional[str]:
"""Get the path to the configuration file used to start the service daemon.
Returns:
The path to the configuration file, or None, if the
service has never been started before.
"""
if not self.runtime_path:
return None
return os.path.join(self.runtime_path, SERVICE_DAEMON_CONFIG_FILE_NAME)
@property
def log_file(self) -> Optional[str]:
"""Get the path to the log file where the service output is/has been logged.
Returns:
The path to the log file, or None, if the service has never been
started before, or if the service daemon output is suppressed.
"""
if not self.runtime_path or self.silent_daemon:
return None
return os.path.join(self.runtime_path, SERVICE_DAEMON_LOG_FILE_NAME)
@property
def pid_file(self) -> Optional[str]:
"""Get the path to a daemon PID file.
This is where the last known PID of the daemon process is stored.
Returns:
The path to the PID file, or None, if the service has never been
started before.
"""
if not self.runtime_path or self.silent_daemon:
return None
return os.path.join(self.runtime_path, SERVICE_DAEMON_PID_FILE_NAME)
@property
def pid(self) -> Optional[int]:
"""Return the PID of the currently running daemon.
Returns:
The PID of the daemon, or None, if the service has never been
started before.
"""
pid_file = self.pid_file
if not pid_file:
return None
if sys.platform == "win32":
logger.warning(
"Daemon functionality is currently not supported on Windows."
)
return None
else:
import zenml.services.local.local_daemon_entrypoint as daemon_entrypoint
from zenml.utils.daemon import get_daemon_pid_if_running
pid = get_daemon_pid_if_running(pid_file)
# let's be extra careful here and check that the PID really
# belongs to a process that is a local ZenML daemon.
# this avoids the situation where a PID file is left over from
# a previous daemon run, but another process is using the same
# PID.
try:
p = psutil.Process(pid)
cmd_line = p.cmdline()
if (
daemon_entrypoint.__name__ not in cmd_line
or self.config_file not in cmd_line
):
return None
return pid
except NoSuchProcess:
return None
config_file: Optional[str]
property
readonly
Get the path to the configuration file used to start the service daemon.
Returns:
Type | Description |
---|---|
Optional[str] |
The path to the configuration file, or None, if the service has never been started before. |
log_file: Optional[str]
property
readonly
Get the path to the log file where the service output is/has been logged.
Returns:
Type | Description |
---|---|
Optional[str] |
The path to the log file, or None, if the service has never been started before, or if the service daemon output is suppressed. |
pid: Optional[int]
property
readonly
Return the PID of the currently running daemon.
Returns:
Type | Description |
---|---|
Optional[int] |
The PID of the daemon, or None, if the service has never been started before. |
pid_file: Optional[str]
property
readonly
Get the path to a daemon PID file.
This is where the last known PID of the daemon process is stored.
Returns:
Type | Description |
---|---|
Optional[str] |
The path to the PID file, or None, if the service has never been started before. |
local_service_endpoint
Implementation of a local service endpoint.
LocalDaemonServiceEndpoint (BaseServiceEndpoint)
pydantic-model
A service endpoint exposed by a local daemon process.
This class extends the base service endpoint class with functionality concerning the life-cycle management and tracking of endpoints exposed by external services implemented as local daemon processes.
Attributes:
Name | Type | Description |
---|---|---|
config |
LocalDaemonServiceEndpointConfig |
service endpoint configuration |
status |
LocalDaemonServiceEndpointStatus |
service endpoint status |
monitor |
Union[zenml.services.service_monitor.HTTPEndpointHealthMonitor, zenml.services.service_monitor.TCPEndpointHealthMonitor] |
optional service endpoint health monitor |
Source code in zenml/services/local/local_service_endpoint.py
class LocalDaemonServiceEndpoint(BaseServiceEndpoint):
"""A service endpoint exposed by a local daemon process.
This class extends the base service endpoint class with functionality
concerning the life-cycle management and tracking of endpoints exposed
by external services implemented as local daemon processes.
Attributes:
config: service endpoint configuration
status: service endpoint status
monitor: optional service endpoint health monitor
"""
config: LocalDaemonServiceEndpointConfig = Field(
default_factory=LocalDaemonServiceEndpointConfig
)
status: LocalDaemonServiceEndpointStatus = Field(
default_factory=LocalDaemonServiceEndpointStatus
)
monitor: Optional[
Union[HTTPEndpointHealthMonitor, TCPEndpointHealthMonitor]
] = Field(..., discriminator="type")
def _lookup_free_port(self) -> int:
"""Search for a free TCP port for the service endpoint.
If a preferred TCP port value is explicitly requested through the
endpoint configuration, it will be checked first. If a port was
previously used the last time the service was running (i.e. as
indicated in the service endpoint status), it will be checked next for
availability.
As a last resort, this call will search for a free TCP port, if
`allocate_port` is set to True in the endpoint configuration.
Returns:
An available TCP port number
Raises:
IOError: if the preferred TCP port is busy and `allocate_port` is
disabled in the endpoint configuration, or if no free TCP port
could be otherwise allocated.
"""
# If a port value is explicitly configured, attempt to use it first
if self.config.port:
if port_available(self.config.port):
return self.config.port
if not self.config.allocate_port:
raise IOError(f"TCP port {self.config.port} is not available.")
# Attempt to reuse the port used when the services was last running
if self.status.port and port_available(self.status.port):
return self.status.port
port = scan_for_available_port()
if port:
return port
raise IOError("No free TCP ports found")
def prepare_for_start(self) -> None:
"""Prepare the service endpoint for starting.
This method is called before the service is started.
"""
self.status.protocol = self.config.protocol
self.status.hostname = self.config.ip_address
self.status.port = self._lookup_free_port()
prepare_for_start(self)
Prepare the service endpoint for starting.
This method is called before the service is started.
Source code in zenml/services/local/local_service_endpoint.py
def prepare_for_start(self) -> None:
"""Prepare the service endpoint for starting.
This method is called before the service is started.
"""
self.status.protocol = self.config.protocol
self.status.hostname = self.config.ip_address
self.status.port = self._lookup_free_port()
LocalDaemonServiceEndpointConfig (ServiceEndpointConfig)
pydantic-model
Local daemon service endpoint configuration.
Attributes:
Name | Type | Description |
---|---|---|
protocol |
ServiceEndpointProtocol |
the TCP protocol implemented by the service endpoint |
port |
Optional[int] |
preferred TCP port value for the service endpoint. If the port
is in use when the service is started, setting |
ip_address |
str |
the IP address of the service endpoint. If not set, the default localhost IP address will be used. |
allocate_port |
bool |
set to True to allocate a free TCP port for the service endpoint automatically. |
Source code in zenml/services/local/local_service_endpoint.py
class LocalDaemonServiceEndpointConfig(ServiceEndpointConfig):
"""Local daemon service endpoint configuration.
Attributes:
protocol: the TCP protocol implemented by the service endpoint
port: preferred TCP port value for the service endpoint. If the port
is in use when the service is started, setting `allocate_port` to
True will also try to allocate a new port value, otherwise an
exception will be raised.
ip_address: the IP address of the service endpoint. If not set, the
default localhost IP address will be used.
allocate_port: set to True to allocate a free TCP port for the
service endpoint automatically.
"""
protocol: ServiceEndpointProtocol = ServiceEndpointProtocol.TCP
port: Optional[int] = None
ip_address: str = DEFAULT_LOCAL_SERVICE_IP_ADDRESS
allocate_port: bool = True
LocalDaemonServiceEndpointStatus (ServiceEndpointStatus)
pydantic-model
Local daemon service endpoint status.
Source code in zenml/services/local/local_service_endpoint.py
class LocalDaemonServiceEndpointStatus(ServiceEndpointStatus):
"""Local daemon service endpoint status."""
service
Implementation of the ZenML Service class.
BaseService (BaseTypedModel)
pydantic-model
Base service class.
This class implements generic functionality concerning the life-cycle management and tracking of an external service (e.g. process, container, Kubernetes deployment etc.).
Attributes:
Name | Type | Description |
---|---|---|
SERVICE_TYPE |
ClassVar[zenml.services.service_type.ServiceType] |
a service type descriptor with information describing the service class. Every concrete service class must define this. |
admin_state |
ServiceState |
the administrative state of the service. |
uuid |
UUID |
unique UUID identifier for the service instance. |
config |
LocalDaemonServiceConfig |
service configuration |
status |
LocalDaemonServiceStatus |
service status |
endpoint |
Optional[zenml.services.local.local_service_endpoint.LocalDaemonServiceEndpoint] |
optional service endpoint |
Source code in zenml/services/service.py
class BaseService(BaseTypedModel, metaclass=BaseServiceMeta):
"""Base service class.
This class implements generic functionality concerning the life-cycle
management and tracking of an external service (e.g. process, container,
Kubernetes deployment etc.).
Attributes:
SERVICE_TYPE: a service type descriptor with information describing
the service class. Every concrete service class must define this.
admin_state: the administrative state of the service.
uuid: unique UUID identifier for the service instance.
config: service configuration
status: service status
endpoint: optional service endpoint
"""
SERVICE_TYPE: ClassVar[ServiceType]
uuid: UUID = Field(default_factory=uuid4, allow_mutation=False)
admin_state: ServiceState = ServiceState.INACTIVE
config: ServiceConfig
status: ServiceStatus
# TODO [ENG-703]: allow multiple endpoints per service
endpoint: Optional[BaseServiceEndpoint]
def __init__(
self,
**attrs: Any,
) -> None:
"""Initialize the service instance.
Args:
**attrs: keyword arguments.
"""
super().__init__(**attrs)
self.config.name = self.config.name or self.__class__.__name__
@abstractmethod
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the external service.
This method should be overridden by subclasses that implement
concrete service tracking functionality.
Returns:
The operational state of the external service and a message
providing additional information about that state (e.g. a
description of the error if one is encountered while checking the
service status).
"""
@abstractmethod
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the service logs.
This method should be overridden by subclasses that implement
concrete service tracking functionality.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
def update_status(self) -> None:
"""Update the service of the service.
Check the current operational state of the external service
and update the local operational status information to reflect it.
This method should be overridden by subclasses that implement
concrete service status tracking functionality.
"""
logger.debug(
"Running status check for service '%s' ...",
self,
)
state, err = self.check_status()
logger.debug(
"Status check results for service '%s': %s [%s]",
self,
state.name,
err,
)
self.status.update_state(state, err)
# don't bother checking the endpoint state if the service is not active
if self.status.state == ServiceState.INACTIVE:
return
if self.endpoint:
self.endpoint.update_status()
def get_service_status_message(self) -> str:
"""Get a service status message.
Returns:
A message providing information about the current operational
state of the service.
"""
return (
f" Administrative state: `{self.admin_state.value}`\n"
f" Operational state: `{self.status.state.value}`\n"
f" Last status message: '{self.status.last_error}'\n"
)
def poll_service_status(self, timeout: int = 0) -> bool:
"""Polls the external service status.
It does this until the service operational state matches the
administrative state, the service enters a failed state, or the timeout
is reached.
Args:
timeout: maximum time to wait for the service operational state
to match the administrative state, in seconds
Returns:
True if the service operational state matches the administrative
state, False otherwise.
"""
time_remaining = timeout
while True:
if self.admin_state == ServiceState.ACTIVE and self.is_running:
return True
if self.admin_state == ServiceState.INACTIVE and self.is_stopped:
return True
if self.is_failed:
return False
if time_remaining <= 0:
break
time.sleep(1)
time_remaining -= 1
if timeout > 0:
logger.error(
f"Timed out waiting for service {self} to become "
f"{self.admin_state.value}:\n"
+ self.get_service_status_message()
)
return False
@property
def is_running(self) -> bool:
"""Check if the service is currently running.
This method will actively poll the external service to get its status
and will return the result.
Returns:
True if the service is running and active (i.e. the endpoints are
responsive, if any are configured), otherwise False.
"""
self.update_status()
return self.status.state == ServiceState.ACTIVE and (
not self.endpoint or self.endpoint.is_active()
)
@property
def is_stopped(self) -> bool:
"""Check if the service is currently stopped.
This method will actively poll the external service to get its status
and will return the result.
Returns:
True if the service is stopped, otherwise False.
"""
self.update_status()
return self.status.state == ServiceState.INACTIVE
@property
def is_failed(self) -> bool:
"""Check if the service is currently failed.
This method will actively poll the external service to get its status
and will return the result.
Returns:
True if the service is in a failure state, otherwise False.
"""
self.update_status()
return self.status.state == ServiceState.ERROR
def provision(self) -> None:
"""Provisions resources to run the service.
Raises:
NotImplementedError: if the service does not implement provisioning functionality
"""
raise NotImplementedError(
f"Provisioning resources not implemented for {self}."
)
def deprovision(self, force: bool = False) -> None:
"""Deprovisions all resources used by the service.
Args:
force: if True, the service will be deprovisioned even if it is
in a failed state.
Raises:
NotImplementedError: if the service does not implement
deprovisioning functionality.
"""
raise NotImplementedError(
f"Deprovisioning resources not implemented for {self}."
)
def update(self, config: ServiceConfig) -> None:
"""Update the service configuration.
Args:
config: the new service configuration.
"""
self.config = config
def start(self, timeout: int = 0) -> None:
"""Start the service and optionally wait for it to become active.
Args:
timeout: amount of time to wait for the service to become active.
If set to 0, the method will return immediately after checking
the service status.
Raises:
RuntimeError: if the service cannot be started
"""
with console.status(f"Starting service '{self}'.\n"):
self.admin_state = ServiceState.ACTIVE
self.provision()
if timeout > 0:
if not self.poll_service_status(timeout):
raise RuntimeError(
f"Failed to start service {self}\n"
+ self.get_service_status_message()
)
def stop(self, timeout: int = 0, force: bool = False) -> None:
"""Stop the service and optionally wait for it to shutdown.
Args:
timeout: amount of time to wait for the service to shutdown.
If set to 0, the method will return immediately after checking
the service status.
force: if True, the service will be stopped even if it is not
currently running.
Raises:
RuntimeError: if the service cannot be stopped
"""
with console.status(f"Stopping service '{self}'.\n"):
self.admin_state = ServiceState.INACTIVE
self.deprovision(force)
if timeout > 0:
self.poll_service_status(timeout)
if not self.is_stopped:
raise RuntimeError(
f"Failed to stop service {self}. Last state: "
f"'{self.status.state.value}'. Last error: "
f"'{self.status.last_error}'"
)
def __repr__(self) -> str:
"""String representation of the service.
Returns:
A string representation of the service.
"""
return f"{self.__class__.__qualname__}[{self.uuid}] (type: {self.SERVICE_TYPE.type}, flavor: {self.SERVICE_TYPE.flavor})"
def __str__(self) -> str:
"""String representation of the service.
Returns:
A string representation of the service.
"""
return self.__repr__()
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
is_failed: bool
property
readonly
Check if the service is currently failed.
This method will actively poll the external service to get its status and will return the result.
Returns:
Type | Description |
---|---|
bool |
True if the service is in a failure state, otherwise False. |
is_running: bool
property
readonly
Check if the service is currently running.
This method will actively poll the external service to get its status and will return the result.
Returns:
Type | Description |
---|---|
bool |
True if the service is running and active (i.e. the endpoints are responsive, if any are configured), otherwise False. |
is_stopped: bool
property
readonly
Check if the service is currently stopped.
This method will actively poll the external service to get its status and will return the result.
Returns:
Type | Description |
---|---|
bool |
True if the service is stopped, otherwise False. |
Config
Pydantic configuration class.
Source code in zenml/services/service.py
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
__init__(self, **attrs)
special
Initialize the service instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
**attrs |
Any |
keyword arguments. |
{} |
Source code in zenml/services/service.py
def __init__(
self,
**attrs: Any,
) -> None:
"""Initialize the service instance.
Args:
**attrs: keyword arguments.
"""
super().__init__(**attrs)
self.config.name = self.config.name or self.__class__.__name__
__repr__(self)
special
String representation of the service.
Returns:
Type | Description |
---|---|
str |
A string representation of the service. |
Source code in zenml/services/service.py
def __repr__(self) -> str:
"""String representation of the service.
Returns:
A string representation of the service.
"""
return f"{self.__class__.__qualname__}[{self.uuid}] (type: {self.SERVICE_TYPE.type}, flavor: {self.SERVICE_TYPE.flavor})"
__str__(self)
special
String representation of the service.
Returns:
Type | Description |
---|---|
str |
A string representation of the service. |
Source code in zenml/services/service.py
def __str__(self) -> str:
"""String representation of the service.
Returns:
A string representation of the service.
"""
return self.__repr__()
check_status(self)
Check the the current operational state of the external service.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the external service and a message providing additional information about that state (e.g. a description of the error if one is encountered while checking the service status). |
Source code in zenml/services/service.py
@abstractmethod
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the external service.
This method should be overridden by subclasses that implement
concrete service tracking functionality.
Returns:
The operational state of the external service and a message
providing additional information about that state (e.g. a
description of the error if one is encountered while checking the
service status).
"""
deprovision(self, force=False)
Deprovisions all resources used by the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
if True, the service will be deprovisioned even if it is in a failed state. |
False |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
if the service does not implement deprovisioning functionality. |
Source code in zenml/services/service.py
def deprovision(self, force: bool = False) -> None:
"""Deprovisions all resources used by the service.
Args:
force: if True, the service will be deprovisioned even if it is
in a failed state.
Raises:
NotImplementedError: if the service does not implement
deprovisioning functionality.
"""
raise NotImplementedError(
f"Deprovisioning resources not implemented for {self}."
)
get_logs(self, follow=False, tail=None)
Retrieve the service logs.
This method should be overridden by subclasses that implement concrete service tracking functionality.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Source code in zenml/services/service.py
@abstractmethod
def get_logs(
self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
"""Retrieve the service logs.
This method should be overridden by subclasses that implement
concrete service tracking functionality.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
get_service_status_message(self)
Get a service status message.
Returns:
Type | Description |
---|---|
str |
A message providing information about the current operational state of the service. |
Source code in zenml/services/service.py
def get_service_status_message(self) -> str:
"""Get a service status message.
Returns:
A message providing information about the current operational
state of the service.
"""
return (
f" Administrative state: `{self.admin_state.value}`\n"
f" Operational state: `{self.status.state.value}`\n"
f" Last status message: '{self.status.last_error}'\n"
)
poll_service_status(self, timeout=0)
Polls the external service status.
It does this until the service operational state matches the administrative state, the service enters a failed state, or the timeout is reached.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
int |
maximum time to wait for the service operational state to match the administrative state, in seconds |
0 |
Returns:
Type | Description |
---|---|
bool |
True if the service operational state matches the administrative state, False otherwise. |
Source code in zenml/services/service.py
def poll_service_status(self, timeout: int = 0) -> bool:
"""Polls the external service status.
It does this until the service operational state matches the
administrative state, the service enters a failed state, or the timeout
is reached.
Args:
timeout: maximum time to wait for the service operational state
to match the administrative state, in seconds
Returns:
True if the service operational state matches the administrative
state, False otherwise.
"""
time_remaining = timeout
while True:
if self.admin_state == ServiceState.ACTIVE and self.is_running:
return True
if self.admin_state == ServiceState.INACTIVE and self.is_stopped:
return True
if self.is_failed:
return False
if time_remaining <= 0:
break
time.sleep(1)
time_remaining -= 1
if timeout > 0:
logger.error(
f"Timed out waiting for service {self} to become "
f"{self.admin_state.value}:\n"
+ self.get_service_status_message()
)
return False
provision(self)
Provisions resources to run the service.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
if the service does not implement provisioning functionality |
Source code in zenml/services/service.py
def provision(self) -> None:
"""Provisions resources to run the service.
Raises:
NotImplementedError: if the service does not implement provisioning functionality
"""
raise NotImplementedError(
f"Provisioning resources not implemented for {self}."
)
start(self, timeout=0)
Start the service and optionally wait for it to become active.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
int |
amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status. |
0 |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the service cannot be started |
Source code in zenml/services/service.py
def start(self, timeout: int = 0) -> None:
"""Start the service and optionally wait for it to become active.
Args:
timeout: amount of time to wait for the service to become active.
If set to 0, the method will return immediately after checking
the service status.
Raises:
RuntimeError: if the service cannot be started
"""
with console.status(f"Starting service '{self}'.\n"):
self.admin_state = ServiceState.ACTIVE
self.provision()
if timeout > 0:
if not self.poll_service_status(timeout):
raise RuntimeError(
f"Failed to start service {self}\n"
+ self.get_service_status_message()
)
stop(self, timeout=0, force=False)
Stop the service and optionally wait for it to shutdown.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
int |
amount of time to wait for the service to shutdown. If set to 0, the method will return immediately after checking the service status. |
0 |
force |
bool |
if True, the service will be stopped even if it is not currently running. |
False |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the service cannot be stopped |
Source code in zenml/services/service.py
def stop(self, timeout: int = 0, force: bool = False) -> None:
"""Stop the service and optionally wait for it to shutdown.
Args:
timeout: amount of time to wait for the service to shutdown.
If set to 0, the method will return immediately after checking
the service status.
force: if True, the service will be stopped even if it is not
currently running.
Raises:
RuntimeError: if the service cannot be stopped
"""
with console.status(f"Stopping service '{self}'.\n"):
self.admin_state = ServiceState.INACTIVE
self.deprovision(force)
if timeout > 0:
self.poll_service_status(timeout)
if not self.is_stopped:
raise RuntimeError(
f"Failed to stop service {self}. Last state: "
f"'{self.status.state.value}'. Last error: "
f"'{self.status.last_error}'"
)
update(self, config)
Update the service configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServiceConfig |
the new service configuration. |
required |
Source code in zenml/services/service.py
def update(self, config: ServiceConfig) -> None:
"""Update the service configuration.
Args:
config: the new service configuration.
"""
self.config = config
update_status(self)
Update the service of the service.
Check the current operational state of the external service and update the local operational status information to reflect it.
This method should be overridden by subclasses that implement concrete service status tracking functionality.
Source code in zenml/services/service.py
def update_status(self) -> None:
"""Update the service of the service.
Check the current operational state of the external service
and update the local operational status information to reflect it.
This method should be overridden by subclasses that implement
concrete service status tracking functionality.
"""
logger.debug(
"Running status check for service '%s' ...",
self,
)
state, err = self.check_status()
logger.debug(
"Status check results for service '%s': %s [%s]",
self,
state.name,
err,
)
self.status.update_state(state, err)
# don't bother checking the endpoint state if the service is not active
if self.status.state == ServiceState.INACTIVE:
return
if self.endpoint:
self.endpoint.update_status()
BaseServiceMeta (BaseTypedModelMeta)
Metaclass responsible for registering different BaseService subclasses.
This metaclass has two main responsibilities: 1. register all BaseService types in the service registry. This is relevant when services are deserialized and instantiated from their JSON or dict representation, because their type needs to be known beforehand. 2. ensuring BaseService instance uniqueness by enforcing that no two service instances have the same UUID value. Implementing this at the constructor level guarantees that deserializing a service instance from a JSON representation multiple times always returns the same service object.
Source code in zenml/services/service.py
class BaseServiceMeta(BaseTypedModelMeta):
"""Metaclass responsible for registering different BaseService subclasses.
This metaclass has two main responsibilities:
1. register all BaseService types in the service registry. This is relevant
when services are deserialized and instantiated from their JSON or dict
representation, because their type needs to be known beforehand.
2. ensuring BaseService instance uniqueness by enforcing that no two
service instances have the same UUID value. Implementing this at the
constructor level guarantees that deserializing a service instance from
a JSON representation multiple times always returns the same service object.
"""
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseServiceMeta":
"""Creates a BaseService class and registers it in the `ServiceRegistry`.
Args:
name: name of the class.
bases: tuple of base classes.
dct: dictionary of class attributes.
Returns:
the created BaseServiceMeta class.
Raises:
TypeError: if the 'service_type' reserved attribute name is used.
"""
service_type = dct.get("SERVICE_TYPE", None)
# register only classes of concrete service implementations
if service_type:
# add the service type class attribute to the class as a regular
# immutable attribute to include it in the JSON representation
if "service_type" in dct:
raise TypeError(
"`service_type` is a reserved attribute name for BaseService "
"subclasses"
)
dct.setdefault("__annotations__", dict())[
"service_type"
] = ServiceType
dct["service_type"] = Field(service_type, allow_mutation=False)
cls = cast(Type["BaseService"], super().__new__(mcs, name, bases, dct))
# register only classes of concrete service implementations
if service_type:
# register the service type in the service registry
ServiceRegistry().register_service_type(cls)
return cls
def __call__(cls, *args: Any, **kwargs: Any) -> "BaseServiceMeta":
"""Validate the creation of a service.
Args:
*args: positional arguments.
**kwargs: keyword arguments.
Returns:
the created BaseServiceMeta class.
Raises:
AttributeError: if the service UUID is untyped.
ValueError: if the service UUID is not a UUID type.
"""
if not getattr(cls, "SERVICE_TYPE", None):
raise AttributeError(
f"Untyped service instances are not allowed. Please set the "
f"SERVICE_TYPE class attribute for {cls}."
)
uuid = kwargs.get("uuid", None)
if uuid:
if isinstance(uuid, str):
uuid = UUID(uuid)
if not isinstance(uuid, UUID):
raise ValueError(
f"The `uuid` argument for {cls} must be a UUID instance or a "
f"string representation of a UUID."
)
# if a service instance with the same UUID is already registered,
# return the existing instance rather than the newly created one
existing_service = ServiceRegistry().get_service(uuid)
if existing_service:
logger.debug(
f"Reusing existing service '{existing_service}' "
f"instead of creating a new service with the same UUID."
)
return cast("BaseServiceMeta", existing_service)
svc = cast("BaseService", super().__call__(*args, **kwargs))
ServiceRegistry().register_service(svc)
return cast("BaseServiceMeta", svc)
__call__(cls, *args, **kwargs)
special
Validate the creation of a service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
positional arguments. |
() |
**kwargs |
Any |
keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
BaseServiceMeta |
the created BaseServiceMeta class. |
Exceptions:
Type | Description |
---|---|
AttributeError |
if the service UUID is untyped. |
ValueError |
if the service UUID is not a UUID type. |
Source code in zenml/services/service.py
def __call__(cls, *args: Any, **kwargs: Any) -> "BaseServiceMeta":
"""Validate the creation of a service.
Args:
*args: positional arguments.
**kwargs: keyword arguments.
Returns:
the created BaseServiceMeta class.
Raises:
AttributeError: if the service UUID is untyped.
ValueError: if the service UUID is not a UUID type.
"""
if not getattr(cls, "SERVICE_TYPE", None):
raise AttributeError(
f"Untyped service instances are not allowed. Please set the "
f"SERVICE_TYPE class attribute for {cls}."
)
uuid = kwargs.get("uuid", None)
if uuid:
if isinstance(uuid, str):
uuid = UUID(uuid)
if not isinstance(uuid, UUID):
raise ValueError(
f"The `uuid` argument for {cls} must be a UUID instance or a "
f"string representation of a UUID."
)
# if a service instance with the same UUID is already registered,
# return the existing instance rather than the newly created one
existing_service = ServiceRegistry().get_service(uuid)
if existing_service:
logger.debug(
f"Reusing existing service '{existing_service}' "
f"instead of creating a new service with the same UUID."
)
return cast("BaseServiceMeta", existing_service)
svc = cast("BaseService", super().__call__(*args, **kwargs))
ServiceRegistry().register_service(svc)
return cast("BaseServiceMeta", svc)
__new__(mcs, name, bases, dct)
special
staticmethod
Creates a BaseService class and registers it in the ServiceRegistry
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of the class. |
required |
bases |
Tuple[Type[Any], ...] |
tuple of base classes. |
required |
dct |
Dict[str, Any] |
dictionary of class attributes. |
required |
Returns:
Type | Description |
---|---|
BaseServiceMeta |
the created BaseServiceMeta class. |
Exceptions:
Type | Description |
---|---|
TypeError |
if the 'service_type' reserved attribute name is used. |
Source code in zenml/services/service.py
def __new__(
mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "BaseServiceMeta":
"""Creates a BaseService class and registers it in the `ServiceRegistry`.
Args:
name: name of the class.
bases: tuple of base classes.
dct: dictionary of class attributes.
Returns:
the created BaseServiceMeta class.
Raises:
TypeError: if the 'service_type' reserved attribute name is used.
"""
service_type = dct.get("SERVICE_TYPE", None)
# register only classes of concrete service implementations
if service_type:
# add the service type class attribute to the class as a regular
# immutable attribute to include it in the JSON representation
if "service_type" in dct:
raise TypeError(
"`service_type` is a reserved attribute name for BaseService "
"subclasses"
)
dct.setdefault("__annotations__", dict())[
"service_type"
] = ServiceType
dct["service_type"] = Field(service_type, allow_mutation=False)
cls = cast(Type["BaseService"], super().__new__(mcs, name, bases, dct))
# register only classes of concrete service implementations
if service_type:
# register the service type in the service registry
ServiceRegistry().register_service_type(cls)
return cls
ServiceConfig (BaseTypedModel)
pydantic-model
Generic service configuration.
Concrete service classes should extend this class and add additional attributes that they want to see reflected and used in the service configuration.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
name for the service instance |
description |
str |
description of the service |
pipeline_name |
str |
name of the pipeline that spun up the service |
pipeline_run_id |
str |
ID of the pipeline run that spun up the service |
pipeline_step_name |
str |
name of the pipeline step that spun up the service |
Source code in zenml/services/service.py
class ServiceConfig(BaseTypedModel):
"""Generic service configuration.
Concrete service classes should extend this class and add additional
attributes that they want to see reflected and used in the service
configuration.
Attributes:
name: name for the service instance
description: description of the service
pipeline_name: name of the pipeline that spun up the service
pipeline_run_id: ID of the pipeline run that spun up the service
pipeline_step_name: name of the pipeline step that spun up the service
"""
name: str = ""
description: str = ""
pipeline_name: str = ""
pipeline_run_id: str = ""
pipeline_step_name: str = ""
service_endpoint
Implementation of a ZenML service endpoint.
BaseServiceEndpoint (BaseTypedModel)
pydantic-model
Base service class.
This class implements generic functionality concerning the life-cycle management and tracking of an external service endpoint (e.g. a HTTP/HTTPS API or generic TCP endpoint exposed by a service).
Attributes:
Name | Type | Description |
---|---|---|
admin_state |
ServiceState |
the administrative state of the service endpoint |
config |
LocalDaemonServiceEndpointConfig |
service endpoint configuration |
status |
LocalDaemonServiceEndpointStatus |
service endpoint status |
monitor |
Union[zenml.services.service_monitor.HTTPEndpointHealthMonitor, zenml.services.service_monitor.TCPEndpointHealthMonitor] |
optional service endpoint health monitor |
Source code in zenml/services/service_endpoint.py
class BaseServiceEndpoint(BaseTypedModel):
"""Base service class.
This class implements generic functionality concerning the life-cycle
management and tracking of an external service endpoint (e.g. a HTTP/HTTPS
API or generic TCP endpoint exposed by a service).
Attributes:
admin_state: the administrative state of the service endpoint
config: service endpoint configuration
status: service endpoint status
monitor: optional service endpoint health monitor
"""
admin_state: ServiceState = ServiceState.INACTIVE
config: ServiceEndpointConfig
status: ServiceEndpointStatus
# TODO [ENG-701]: allow multiple monitors per endpoint
monitor: Optional[BaseServiceEndpointHealthMonitor] = None
def __init__(
self,
*args: Any,
**kwargs: Any,
) -> None:
"""Initialize the service endpoint.
Args:
*args: positional arguments.
**kwargs: keyword arguments.
"""
super().__init__(*args, **kwargs)
self.config.name = self.config.name or self.__class__.__name__
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the external service endpoint.
Returns:
The operational state of the external service endpoint and a
message providing additional information about that state
(e.g. a description of the error, if one is encountered while
checking the service status).
"""
if not self.monitor:
# no health monitor configured; assume service operational state
# always matches the admin state
return self.admin_state, ""
return self.monitor.check_endpoint_status(self)
def update_status(self) -> None:
"""Check the the current operational state of the external service endpoint.
It updates the local operational status information accordingly.
"""
logger.debug(
"Running health check for service endpoint '%s' ...",
self.config.name,
)
state, err = self.check_status()
logger.debug(
"Health check results for service endpoint '%s': %s [%s]",
self.config.name,
state.name,
err,
)
self.status.update_state(state, err)
def is_active(self) -> bool:
"""Check if the service endpoint is active.
This means that it is responsive and can receive requests). This method
will use the configured health monitor to actively check the endpoint
status and will return the result.
Returns:
True if the service endpoint is active, otherwise False.
"""
self.update_status()
return self.status.state == ServiceState.ACTIVE
def is_inactive(self) -> bool:
"""Check if the service endpoint is inactive.
This means that it is unresponsive and cannot receive requests. This
method will use the configured health monitor to actively check the
endpoint status and will return the result.
Returns:
True if the service endpoint is inactive, otherwise False.
"""
self.update_status()
return self.status.state == ServiceState.INACTIVE
__init__(self, *args, **kwargs)
special
Initialize the service endpoint.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
positional arguments. |
() |
**kwargs |
Any |
keyword arguments. |
{} |
Source code in zenml/services/service_endpoint.py
def __init__(
self,
*args: Any,
**kwargs: Any,
) -> None:
"""Initialize the service endpoint.
Args:
*args: positional arguments.
**kwargs: keyword arguments.
"""
super().__init__(*args, **kwargs)
self.config.name = self.config.name or self.__class__.__name__
check_status(self)
Check the the current operational state of the external service endpoint.
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the external service endpoint and a message providing additional information about that state (e.g. a description of the error, if one is encountered while checking the service status). |
Source code in zenml/services/service_endpoint.py
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the external service endpoint.
Returns:
The operational state of the external service endpoint and a
message providing additional information about that state
(e.g. a description of the error, if one is encountered while
checking the service status).
"""
if not self.monitor:
# no health monitor configured; assume service operational state
# always matches the admin state
return self.admin_state, ""
return self.monitor.check_endpoint_status(self)
is_active(self)
Check if the service endpoint is active.
This means that it is responsive and can receive requests). This method will use the configured health monitor to actively check the endpoint status and will return the result.
Returns:
Type | Description |
---|---|
bool |
True if the service endpoint is active, otherwise False. |
Source code in zenml/services/service_endpoint.py
def is_active(self) -> bool:
"""Check if the service endpoint is active.
This means that it is responsive and can receive requests). This method
will use the configured health monitor to actively check the endpoint
status and will return the result.
Returns:
True if the service endpoint is active, otherwise False.
"""
self.update_status()
return self.status.state == ServiceState.ACTIVE
is_inactive(self)
Check if the service endpoint is inactive.
This means that it is unresponsive and cannot receive requests. This method will use the configured health monitor to actively check the endpoint status and will return the result.
Returns:
Type | Description |
---|---|
bool |
True if the service endpoint is inactive, otherwise False. |
Source code in zenml/services/service_endpoint.py
def is_inactive(self) -> bool:
"""Check if the service endpoint is inactive.
This means that it is unresponsive and cannot receive requests. This
method will use the configured health monitor to actively check the
endpoint status and will return the result.
Returns:
True if the service endpoint is inactive, otherwise False.
"""
self.update_status()
return self.status.state == ServiceState.INACTIVE
update_status(self)
Check the the current operational state of the external service endpoint.
It updates the local operational status information accordingly.
Source code in zenml/services/service_endpoint.py
def update_status(self) -> None:
"""Check the the current operational state of the external service endpoint.
It updates the local operational status information accordingly.
"""
logger.debug(
"Running health check for service endpoint '%s' ...",
self.config.name,
)
state, err = self.check_status()
logger.debug(
"Health check results for service endpoint '%s': %s [%s]",
self.config.name,
state.name,
err,
)
self.status.update_state(state, err)
ServiceEndpointConfig (BaseTypedModel)
pydantic-model
Generic service endpoint configuration.
Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the endpoint configuration.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
unique name for the service endpoint |
description |
str |
description of the service endpoint |
Source code in zenml/services/service_endpoint.py
class ServiceEndpointConfig(BaseTypedModel):
"""Generic service endpoint configuration.
Concrete service classes should extend this class and add additional
attributes that they want to see reflected and use in the endpoint
configuration.
Attributes:
name: unique name for the service endpoint
description: description of the service endpoint
"""
name: str = ""
description: str = ""
ServiceEndpointProtocol (StrEnum)
Possible endpoint protocol values.
Source code in zenml/services/service_endpoint.py
class ServiceEndpointProtocol(StrEnum):
"""Possible endpoint protocol values."""
TCP = "tcp"
HTTP = "http"
HTTPS = "https"
ServiceEndpointStatus (ServiceStatus)
pydantic-model
Status information describing the operational state of a service endpoint.
For example, this could be a HTTP/HTTPS API or generic TCP endpoint exposed by a service. Concrete service classes should extend this class and add additional attributes that make up the operational state of the service endpoint.
Attributes:
Name | Type | Description |
---|---|---|
protocol |
ServiceEndpointProtocol |
the TCP protocol used by the service endpoint |
hostname |
Optional[str] |
the hostname where the service endpoint is accessible |
port |
Optional[int] |
the current TCP port where the service endpoint is accessible |
Source code in zenml/services/service_endpoint.py
class ServiceEndpointStatus(ServiceStatus):
"""Status information describing the operational state of a service endpoint.
For example, this could be a HTTP/HTTPS API or generic TCP endpoint exposed
by a service. Concrete service classes should extend this class and add
additional attributes that make up the operational state of the service
endpoint.
Attributes:
protocol: the TCP protocol used by the service endpoint
hostname: the hostname where the service endpoint is accessible
port: the current TCP port where the service endpoint is accessible
"""
protocol: ServiceEndpointProtocol = ServiceEndpointProtocol.TCP
hostname: Optional[str] = None
port: Optional[int] = None
@property
def uri(self) -> Optional[str]:
"""Get the URI of the service endpoint.
Returns:
The URI of the service endpoint or None, if the service endpoint
operational status doesn't have the required information.
"""
if not self.hostname or not self.port or not self.protocol:
# the service is not yet in a state in which the endpoint hostname
# port and protocol are known
return None
return f"{self.protocol.value}://{self.hostname}:{self.port}/"
uri: Optional[str]
property
readonly
Get the URI of the service endpoint.
Returns:
Type | Description |
---|---|
Optional[str] |
The URI of the service endpoint or None, if the service endpoint operational status doesn't have the required information. |
service_monitor
Implementation of the service health monitor.
BaseServiceEndpointHealthMonitor (BaseTypedModel)
pydantic-model
Base class used for service endpoint health monitors.
Attributes:
Name | Type | Description |
---|---|---|
config |
ServiceEndpointHealthMonitorConfig |
health monitor configuration for endpoint |
Source code in zenml/services/service_monitor.py
class BaseServiceEndpointHealthMonitor(BaseTypedModel):
"""Base class used for service endpoint health monitors.
Attributes:
config: health monitor configuration for endpoint
"""
config: ServiceEndpointHealthMonitorConfig = Field(
default_factory=ServiceEndpointHealthMonitorConfig
)
@abstractmethod
def check_endpoint_status(
self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the external service endpoint.
Args:
endpoint: service endpoint to check
This method should be overridden by subclasses that implement
concrete service endpoint tracking functionality.
Returns:
The operational state of the external service endpoint and an
optional error message, if an error is encountered while checking
the service endpoint status.
"""
check_endpoint_status(self, endpoint)
Check the the current operational state of the external service endpoint.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
endpoint |
BaseServiceEndpoint |
service endpoint to check |
required |
This method should be overridden by subclasses that implement concrete service endpoint tracking functionality.
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the external service endpoint and an optional error message, if an error is encountered while checking the service endpoint status. |
Source code in zenml/services/service_monitor.py
@abstractmethod
def check_endpoint_status(
self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the external service endpoint.
Args:
endpoint: service endpoint to check
This method should be overridden by subclasses that implement
concrete service endpoint tracking functionality.
Returns:
The operational state of the external service endpoint and an
optional error message, if an error is encountered while checking
the service endpoint status.
"""
HTTPEndpointHealthMonitor (BaseServiceEndpointHealthMonitor)
pydantic-model
HTTP service endpoint health monitor.
Attributes:
Name | Type | Description |
---|---|---|
config |
HTTPEndpointHealthMonitorConfig |
health monitor configuration for HTTP endpoint |
Source code in zenml/services/service_monitor.py
class HTTPEndpointHealthMonitor(BaseServiceEndpointHealthMonitor):
"""HTTP service endpoint health monitor.
Attributes:
config: health monitor configuration for HTTP endpoint
"""
config: HTTPEndpointHealthMonitorConfig = Field(
default_factory=HTTPEndpointHealthMonitorConfig
)
def get_healthcheck_uri(
self, endpoint: "BaseServiceEndpoint"
) -> Optional[str]:
"""Get the healthcheck URI for the given service endpoint.
Args:
endpoint: service endpoint to get the healthcheck URI for
Returns:
The healthcheck URI for the given service endpoint or None, if
the service endpoint doesn't have a healthcheck URI.
"""
uri = endpoint.status.uri
if not uri:
return None
return f"{uri}{self.config.healthcheck_uri_path}"
def check_endpoint_status(
self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
"""Run a HTTP endpoint API healthcheck.
Args:
endpoint: service endpoint to check.
Returns:
The operational state of the external HTTP endpoint and an
optional message describing that state (e.g. an error message,
if an error is encountered while checking the HTTP endpoint
status).
"""
from zenml.services.service_endpoint import ServiceEndpointProtocol
if endpoint.status.protocol not in [
ServiceEndpointProtocol.HTTP,
ServiceEndpointProtocol.HTTPS,
]:
return (
ServiceState.ERROR,
"endpoint protocol is not HTTP nor HTTPS.",
)
check_uri = self.get_healthcheck_uri(endpoint)
if not check_uri:
return ServiceState.ERROR, "no HTTP healthcheck URI available"
logger.debug("Running HTTP healthcheck for URI: %s", check_uri)
try:
if self.config.use_head_request:
r = requests.head(
check_uri,
timeout=self.config.http_timeout,
)
else:
r = requests.get(
check_uri,
timeout=self.config.http_timeout,
)
if r.status_code == self.config.http_status_code:
# the endpoint is healthy
return ServiceState.ACTIVE, ""
error = f"HTTP endpoint healthcheck returned unexpected status code: {r.status_code}"
except requests.ConnectionError as e:
error = f"HTTP endpoint healthcheck connection error: {str(e)}"
except requests.Timeout as e:
error = f"HTTP endpoint healthcheck request timed out: {str(e)}"
except requests.RequestException as e:
error = (
f"unexpected error encountered while running HTTP endpoint "
f"healthcheck: {str(e)}"
)
return ServiceState.ERROR, error
check_endpoint_status(self, endpoint)
Run a HTTP endpoint API healthcheck.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
endpoint |
BaseServiceEndpoint |
service endpoint to check. |
required |
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the external HTTP endpoint and an optional message describing that state (e.g. an error message, if an error is encountered while checking the HTTP endpoint status). |
Source code in zenml/services/service_monitor.py
def check_endpoint_status(
self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
"""Run a HTTP endpoint API healthcheck.
Args:
endpoint: service endpoint to check.
Returns:
The operational state of the external HTTP endpoint and an
optional message describing that state (e.g. an error message,
if an error is encountered while checking the HTTP endpoint
status).
"""
from zenml.services.service_endpoint import ServiceEndpointProtocol
if endpoint.status.protocol not in [
ServiceEndpointProtocol.HTTP,
ServiceEndpointProtocol.HTTPS,
]:
return (
ServiceState.ERROR,
"endpoint protocol is not HTTP nor HTTPS.",
)
check_uri = self.get_healthcheck_uri(endpoint)
if not check_uri:
return ServiceState.ERROR, "no HTTP healthcheck URI available"
logger.debug("Running HTTP healthcheck for URI: %s", check_uri)
try:
if self.config.use_head_request:
r = requests.head(
check_uri,
timeout=self.config.http_timeout,
)
else:
r = requests.get(
check_uri,
timeout=self.config.http_timeout,
)
if r.status_code == self.config.http_status_code:
# the endpoint is healthy
return ServiceState.ACTIVE, ""
error = f"HTTP endpoint healthcheck returned unexpected status code: {r.status_code}"
except requests.ConnectionError as e:
error = f"HTTP endpoint healthcheck connection error: {str(e)}"
except requests.Timeout as e:
error = f"HTTP endpoint healthcheck request timed out: {str(e)}"
except requests.RequestException as e:
error = (
f"unexpected error encountered while running HTTP endpoint "
f"healthcheck: {str(e)}"
)
return ServiceState.ERROR, error
get_healthcheck_uri(self, endpoint)
Get the healthcheck URI for the given service endpoint.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
endpoint |
BaseServiceEndpoint |
service endpoint to get the healthcheck URI for |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
The healthcheck URI for the given service endpoint or None, if the service endpoint doesn't have a healthcheck URI. |
Source code in zenml/services/service_monitor.py
def get_healthcheck_uri(
self, endpoint: "BaseServiceEndpoint"
) -> Optional[str]:
"""Get the healthcheck URI for the given service endpoint.
Args:
endpoint: service endpoint to get the healthcheck URI for
Returns:
The healthcheck URI for the given service endpoint or None, if
the service endpoint doesn't have a healthcheck URI.
"""
uri = endpoint.status.uri
if not uri:
return None
return f"{uri}{self.config.healthcheck_uri_path}"
HTTPEndpointHealthMonitorConfig (ServiceEndpointHealthMonitorConfig)
pydantic-model
HTTP service endpoint health monitor configuration.
Attributes:
Name | Type | Description |
---|---|---|
healthcheck_uri_path |
str |
URI subpath to use to perform service endpoint healthchecks. If not set, the service endpoint URI will be used instead. |
use_head_request |
bool |
set to True to use a HEAD request instead of a GET when calling the healthcheck URI. |
http_status_code |
int |
HTTP status code to expect in the health check response. |
http_timeout |
int |
HTTP health check request timeout in seconds. |
Source code in zenml/services/service_monitor.py
class HTTPEndpointHealthMonitorConfig(ServiceEndpointHealthMonitorConfig):
"""HTTP service endpoint health monitor configuration.
Attributes:
healthcheck_uri_path: URI subpath to use to perform service endpoint
healthchecks. If not set, the service endpoint URI will be used
instead.
use_head_request: set to True to use a HEAD request instead of a GET
when calling the healthcheck URI.
http_status_code: HTTP status code to expect in the health check
response.
http_timeout: HTTP health check request timeout in seconds.
"""
healthcheck_uri_path: str = ""
use_head_request: bool = False
http_status_code: int = 200
http_timeout: int = DEFAULT_HTTP_HEALTHCHECK_TIMEOUT
ServiceEndpointHealthMonitorConfig (BaseTypedModel)
pydantic-model
Generic service health monitor configuration.
Concrete service classes should extend this class and add additional attributes that they want to see reflected and use in the health monitor configuration.
Source code in zenml/services/service_monitor.py
class ServiceEndpointHealthMonitorConfig(BaseTypedModel):
"""Generic service health monitor configuration.
Concrete service classes should extend this class and add additional
attributes that they want to see reflected and use in the health monitor
configuration.
"""
TCPEndpointHealthMonitor (BaseServiceEndpointHealthMonitor)
pydantic-model
TCP service endpoint health monitor.
Attributes:
Name | Type | Description |
---|---|---|
config |
TCPEndpointHealthMonitorConfig |
health monitor configuration for TCP endpoint |
Source code in zenml/services/service_monitor.py
class TCPEndpointHealthMonitor(BaseServiceEndpointHealthMonitor):
"""TCP service endpoint health monitor.
Attributes:
config: health monitor configuration for TCP endpoint
"""
config: TCPEndpointHealthMonitorConfig
def check_endpoint_status(
self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
"""Run a TCP endpoint healthcheck.
Args:
endpoint: service endpoint to check.
Returns:
The operational state of the external TCP endpoint and an
optional message describing that state (e.g. an error message,
if an error is encountered while checking the TCP endpoint
status).
"""
if not endpoint.status.port or not endpoint.status.hostname:
return (
ServiceState.ERROR,
"TCP port and hostname values are not known",
)
logger.debug(
"Running TCP healthcheck for TCP port: %d", endpoint.status.port
)
if port_is_open(endpoint.status.hostname, endpoint.status.port):
# the endpoint is healthy
return ServiceState.ACTIVE, ""
return (
ServiceState.ERROR,
"TCP endpoint healthcheck error: TCP port is not "
"open or not accessible",
)
check_endpoint_status(self, endpoint)
Run a TCP endpoint healthcheck.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
endpoint |
BaseServiceEndpoint |
service endpoint to check. |
required |
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the external TCP endpoint and an optional message describing that state (e.g. an error message, if an error is encountered while checking the TCP endpoint status). |
Source code in zenml/services/service_monitor.py
def check_endpoint_status(
self, endpoint: "BaseServiceEndpoint"
) -> Tuple[ServiceState, str]:
"""Run a TCP endpoint healthcheck.
Args:
endpoint: service endpoint to check.
Returns:
The operational state of the external TCP endpoint and an
optional message describing that state (e.g. an error message,
if an error is encountered while checking the TCP endpoint
status).
"""
if not endpoint.status.port or not endpoint.status.hostname:
return (
ServiceState.ERROR,
"TCP port and hostname values are not known",
)
logger.debug(
"Running TCP healthcheck for TCP port: %d", endpoint.status.port
)
if port_is_open(endpoint.status.hostname, endpoint.status.port):
# the endpoint is healthy
return ServiceState.ACTIVE, ""
return (
ServiceState.ERROR,
"TCP endpoint healthcheck error: TCP port is not "
"open or not accessible",
)
TCPEndpointHealthMonitorConfig (ServiceEndpointHealthMonitorConfig)
pydantic-model
TCP service endpoint health monitor configuration.
Source code in zenml/services/service_monitor.py
class TCPEndpointHealthMonitorConfig(ServiceEndpointHealthMonitorConfig):
"""TCP service endpoint health monitor configuration."""
service_registry
Implementation of the ZenML service registry.
ServiceRegistry
Registry of service types and service instances.
The service registry provides a central place to register service types as well as service instances.
Source code in zenml/services/service_registry.py
class ServiceRegistry(metaclass=SingletonMetaClass):
"""Registry of service types and service instances.
The service registry provides a central place to register service types
as well as service instances.
"""
def __init__(self) -> None:
"""Initialize the service registry."""
self.service_types: Dict[ServiceType, Type["BaseService"]] = {}
self.services: Dict[UUID, "BaseService"] = {}
def register_service_type(self, cls: Type["BaseService"]) -> None:
"""Registers a new service type.
Args:
cls: a BaseService subclass.
Raises:
TypeError: if the service type is already registered.
"""
service_type = cls.SERVICE_TYPE
if service_type not in self.service_types:
self.service_types[service_type] = cls
logger.debug(
f"Registered service class {cls} for "
f"service type `{service_type}`"
)
else:
raise TypeError(
f"Found existing service type for {service_type}: "
f"{self.service_types[service_type]}. Skipping registration "
f"of {cls}."
)
def get_service_type(
self, service_type: ServiceType
) -> Optional[Type["BaseService"]]:
"""Get the service class registered for a service type.
Args:
service_type: service type.
Returns:
`BaseService` subclass that was registered for the service type or
None, if no service class was registered for the service type.
"""
return self.service_types.get(service_type)
def get_service_types(
self,
) -> Dict[ServiceType, Type["BaseService"]]:
"""Get all registered service types.
Returns:
Dictionary of service types indexed by their service type.
"""
return self.service_types.copy()
def service_type_is_registered(self, service_type: ServiceType) -> bool:
"""Check if a service type is registered.
Args:
service_type: service type.
Returns:
True, if a service type is registered for the service type, False
otherwise.
"""
return service_type in self.service_types
def register_service(self, service: "BaseService") -> None:
"""Registers a new service instance.
Args:
service: a BaseService instance.
Raises:
TypeError: if the service instance has a service type that is not
registered.
Exception: if a preexisting service is found for that UUID.
"""
service_type = service.SERVICE_TYPE
if service_type not in self.service_types:
raise TypeError(f"Service type `{service_type}` is not registered.")
if service.uuid not in self.services:
self.services[service.uuid] = service
logger.debug(f"Registered service {service}")
else:
existing_service = self.services[service.uuid]
raise Exception(
f"Found existing service {existing_service} for UUID: "
f"{service.uuid}. Skipping registration for service "
f"{service}."
)
def get_service(self, uuid: UUID) -> Optional["BaseService"]:
"""Get the service instance registered for a UUID.
Args:
uuid: service instance identifier.
Returns:
`BaseService` instance that was registered for the UUID or
None, if no matching service instance was found.
"""
return self.services.get(uuid)
def get_services(self) -> Dict[UUID, "BaseService"]:
"""Get all service instances currently registered.
Returns:
Dictionary of `BaseService` instances indexed by their UUID with
all services that are currently registered.
"""
return self.services.copy()
def service_is_registered(self, uuid: UUID) -> bool:
"""Check if a service instance is registered.
Args:
uuid: service instance identifier.
Returns:
True, if a service instance is registered for the UUID, False
otherwise.
"""
return uuid in self.services
def load_service_from_dict(
self, service_dict: Dict[str, Any]
) -> "BaseService":
"""Load a service instance from its dict representation.
Creates, registers and returns a service instantiated from the dict
representation of the service configuration and last known status
information.
If an existing service instance with the same UUID is already
present in the service registry, it is returned instead.
Args:
service_dict: dict representation of the service configuration and
last known status
Returns:
A new or existing ZenML service instance.
Raises:
TypeError: if the service type is not registered.
ValueError: if the service type is not valid.
"""
service_type = service_dict.get("service_type")
if not service_type:
raise ValueError(
"Service type not present in the service dictionary"
)
service_type = ServiceType.parse_obj(service_type)
service_class = self.get_service_type(service_type)
if not service_class:
raise TypeError(
f"Cannot load service with unregistered service "
f"type: {service_type}"
)
service = cast("BaseService", service_class.from_dict(service_dict))
return service
def load_service_from_json(self, json_str: str) -> "BaseService":
"""Load a service instance from its JSON representation.
Creates and returns a service instantiated from the JSON serialized
service configuration and last known status information.
Args:
json_str: JSON string representation of the service configuration
and last known status
Returns:
A ZenML service instance.
"""
service_dict = json.loads(json_str)
return self.load_service_from_dict(service_dict)
__init__(self)
special
Initialize the service registry.
Source code in zenml/services/service_registry.py
def __init__(self) -> None:
"""Initialize the service registry."""
self.service_types: Dict[ServiceType, Type["BaseService"]] = {}
self.services: Dict[UUID, "BaseService"] = {}
get_service(self, uuid)
Get the service instance registered for a UUID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
service instance identifier. |
required |
Returns:
Type | Description |
---|---|
Optional[BaseService] |
|
Source code in zenml/services/service_registry.py
def get_service(self, uuid: UUID) -> Optional["BaseService"]:
"""Get the service instance registered for a UUID.
Args:
uuid: service instance identifier.
Returns:
`BaseService` instance that was registered for the UUID or
None, if no matching service instance was found.
"""
return self.services.get(uuid)
get_service_type(self, service_type)
Get the service class registered for a service type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_type |
ServiceType |
service type. |
required |
Returns:
Type | Description |
---|---|
Optional[Type[BaseService]] |
|
Source code in zenml/services/service_registry.py
def get_service_type(
self, service_type: ServiceType
) -> Optional[Type["BaseService"]]:
"""Get the service class registered for a service type.
Args:
service_type: service type.
Returns:
`BaseService` subclass that was registered for the service type or
None, if no service class was registered for the service type.
"""
return self.service_types.get(service_type)
get_service_types(self)
Get all registered service types.
Returns:
Type | Description |
---|---|
Dict[zenml.services.service_type.ServiceType, Type[BaseService]] |
Dictionary of service types indexed by their service type. |
Source code in zenml/services/service_registry.py
def get_service_types(
self,
) -> Dict[ServiceType, Type["BaseService"]]:
"""Get all registered service types.
Returns:
Dictionary of service types indexed by their service type.
"""
return self.service_types.copy()
get_services(self)
Get all service instances currently registered.
Returns:
Type | Description |
---|---|
Dict[uuid.UUID, BaseService] |
Dictionary of |
Source code in zenml/services/service_registry.py
def get_services(self) -> Dict[UUID, "BaseService"]:
"""Get all service instances currently registered.
Returns:
Dictionary of `BaseService` instances indexed by their UUID with
all services that are currently registered.
"""
return self.services.copy()
load_service_from_dict(self, service_dict)
Load a service instance from its dict representation.
Creates, registers and returns a service instantiated from the dict representation of the service configuration and last known status information.
If an existing service instance with the same UUID is already present in the service registry, it is returned instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_dict |
Dict[str, Any] |
dict representation of the service configuration and last known status |
required |
Returns:
Type | Description |
---|---|
BaseService |
A new or existing ZenML service instance. |
Exceptions:
Type | Description |
---|---|
TypeError |
if the service type is not registered. |
ValueError |
if the service type is not valid. |
Source code in zenml/services/service_registry.py
def load_service_from_dict(
self, service_dict: Dict[str, Any]
) -> "BaseService":
"""Load a service instance from its dict representation.
Creates, registers and returns a service instantiated from the dict
representation of the service configuration and last known status
information.
If an existing service instance with the same UUID is already
present in the service registry, it is returned instead.
Args:
service_dict: dict representation of the service configuration and
last known status
Returns:
A new or existing ZenML service instance.
Raises:
TypeError: if the service type is not registered.
ValueError: if the service type is not valid.
"""
service_type = service_dict.get("service_type")
if not service_type:
raise ValueError(
"Service type not present in the service dictionary"
)
service_type = ServiceType.parse_obj(service_type)
service_class = self.get_service_type(service_type)
if not service_class:
raise TypeError(
f"Cannot load service with unregistered service "
f"type: {service_type}"
)
service = cast("BaseService", service_class.from_dict(service_dict))
return service
load_service_from_json(self, json_str)
Load a service instance from its JSON representation.
Creates and returns a service instantiated from the JSON serialized service configuration and last known status information.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
json_str |
str |
JSON string representation of the service configuration and last known status |
required |
Returns:
Type | Description |
---|---|
BaseService |
A ZenML service instance. |
Source code in zenml/services/service_registry.py
def load_service_from_json(self, json_str: str) -> "BaseService":
"""Load a service instance from its JSON representation.
Creates and returns a service instantiated from the JSON serialized
service configuration and last known status information.
Args:
json_str: JSON string representation of the service configuration
and last known status
Returns:
A ZenML service instance.
"""
service_dict = json.loads(json_str)
return self.load_service_from_dict(service_dict)
register_service(self, service)
Registers a new service instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
BaseService |
a BaseService instance. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
if the service instance has a service type that is not registered. |
Exception |
if a preexisting service is found for that UUID. |
Source code in zenml/services/service_registry.py
def register_service(self, service: "BaseService") -> None:
"""Registers a new service instance.
Args:
service: a BaseService instance.
Raises:
TypeError: if the service instance has a service type that is not
registered.
Exception: if a preexisting service is found for that UUID.
"""
service_type = service.SERVICE_TYPE
if service_type not in self.service_types:
raise TypeError(f"Service type `{service_type}` is not registered.")
if service.uuid not in self.services:
self.services[service.uuid] = service
logger.debug(f"Registered service {service}")
else:
existing_service = self.services[service.uuid]
raise Exception(
f"Found existing service {existing_service} for UUID: "
f"{service.uuid}. Skipping registration for service "
f"{service}."
)
register_service_type(self, cls)
Registers a new service type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cls |
Type[BaseService] |
a BaseService subclass. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
if the service type is already registered. |
Source code in zenml/services/service_registry.py
def register_service_type(self, cls: Type["BaseService"]) -> None:
"""Registers a new service type.
Args:
cls: a BaseService subclass.
Raises:
TypeError: if the service type is already registered.
"""
service_type = cls.SERVICE_TYPE
if service_type not in self.service_types:
self.service_types[service_type] = cls
logger.debug(
f"Registered service class {cls} for "
f"service type `{service_type}`"
)
else:
raise TypeError(
f"Found existing service type for {service_type}: "
f"{self.service_types[service_type]}. Skipping registration "
f"of {cls}."
)
service_is_registered(self, uuid)
Check if a service instance is registered.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
service instance identifier. |
required |
Returns:
Type | Description |
---|---|
bool |
True, if a service instance is registered for the UUID, False otherwise. |
Source code in zenml/services/service_registry.py
def service_is_registered(self, uuid: UUID) -> bool:
"""Check if a service instance is registered.
Args:
uuid: service instance identifier.
Returns:
True, if a service instance is registered for the UUID, False
otherwise.
"""
return uuid in self.services
service_type_is_registered(self, service_type)
Check if a service type is registered.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_type |
ServiceType |
service type. |
required |
Returns:
Type | Description |
---|---|
bool |
True, if a service type is registered for the service type, False otherwise. |
Source code in zenml/services/service_registry.py
def service_type_is_registered(self, service_type: ServiceType) -> bool:
"""Check if a service type is registered.
Args:
service_type: service type.
Returns:
True, if a service type is registered for the service type, False
otherwise.
"""
return service_type in self.service_types
service_status
Implementation of the ServiceStatus class.
ServiceState (StrEnum)
Possible states for the service and service endpoint.
Source code in zenml/services/service_status.py
class ServiceState(StrEnum):
"""Possible states for the service and service endpoint."""
ACTIVE = "active"
PENDING_STARTUP = "pending_startup"
INACTIVE = "inactive"
PENDING_SHUTDOWN = "pending_shutdown"
ERROR = "error"
ServiceStatus (BaseTypedModel)
pydantic-model
Information about the status of a service or process.
This information describes the operational status of an external process or service tracked by ZenML. This could be a process, container, Kubernetes deployment etc.
Concrete service classes should extend this class and add additional attributes that make up the operational state of the service.
Attributes:
Name | Type | Description |
---|---|---|
state |
ServiceState |
the current operational state |
last_state |
ServiceState |
the operational state prior to the last status update |
last_error |
str |
the error encountered during the last status update |
Source code in zenml/services/service_status.py
class ServiceStatus(BaseTypedModel):
"""Information about the status of a service or process.
This information describes the operational status of an external process or
service tracked by ZenML. This could be a process, container, Kubernetes
deployment etc.
Concrete service classes should extend this class and add additional
attributes that make up the operational state of the service.
Attributes:
state: the current operational state
last_state: the operational state prior to the last status update
last_error: the error encountered during the last status update
"""
state: ServiceState = ServiceState.INACTIVE
last_state: ServiceState = ServiceState.INACTIVE
last_error: str = ""
def update_state(
self,
new_state: Optional[ServiceState] = None,
error: str = "",
) -> None:
"""Update the current operational state to reflect a new state value and/or error.
Args:
new_state: new operational state discovered by the last service
status update
error: error message describing an operational failure encountered
during the last service status update
"""
if new_state and self.state != new_state:
self.last_state = self.state
self.state = new_state
if error:
self.last_error = error
def clear_error(self) -> None:
"""Clear the last error message."""
self.last_error = ""
clear_error(self)
Clear the last error message.
Source code in zenml/services/service_status.py
def clear_error(self) -> None:
"""Clear the last error message."""
self.last_error = ""
update_state(self, new_state=None, error='')
Update the current operational state to reflect a new state value and/or error.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
new_state |
Optional[zenml.services.service_status.ServiceState] |
new operational state discovered by the last service status update |
None |
error |
str |
error message describing an operational failure encountered during the last service status update |
'' |
Source code in zenml/services/service_status.py
def update_state(
self,
new_state: Optional[ServiceState] = None,
error: str = "",
) -> None:
"""Update the current operational state to reflect a new state value and/or error.
Args:
new_state: new operational state discovered by the last service
status update
error: error message describing an operational failure encountered
during the last service status update
"""
if new_state and self.state != new_state:
self.last_state = self.state
self.state = new_state
if error:
self.last_error = error
service_type
Implementation of a ZenML ServiceType class.
ServiceType (BaseModel)
pydantic-model
Service type descriptor.
Attributes:
Name | Type | Description |
---|---|---|
type |
str |
service type |
flavor |
str |
service flavor |
name |
str |
name of the service type |
description |
str |
description of the service type |
Source code in zenml/services/service_type.py
class ServiceType(BaseModel):
"""Service type descriptor.
Attributes:
type: service type
flavor: service flavor
name: name of the service type
description: description of the service type
"""
type: str
flavor: str
name: str = ""
description: str = ""
class Config:
"""Pydantic configuration class."""
# make the service type immutable and hashable
frozen = True
Config
Pydantic configuration class.
Source code in zenml/services/service_type.py
class Config:
"""Pydantic configuration class."""
# make the service type immutable and hashable
frozen = True
utils
Utils for the ZenML service module.
load_last_service_from_step(pipeline_name, step_name, step_context=None, running=False)
Get the last service created by the pipeline and step with the given names.
This function searches backwards through the execution history for a named pipeline step and returns the first service instance that it finds logged as a step output.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
the name of the pipeline |
required |
step_name |
str |
pipeline step name |
required |
step_context |
Optional[zenml.steps.step_context.StepContext] |
step context required only when called from within a step |
None |
running |
bool |
when this flag is set, the search only returns a running service |
False |
Returns:
Type | Description |
---|---|
Optional[zenml.services.service.BaseService] |
A BaseService instance that represents the service or None if no service was created during the last execution of the pipeline step. |
Exceptions:
Type | Description |
---|---|
KeyError |
if the pipeline or step name is not found in the execution. |
RuntimeError |
if the artifact is not a service. |
Source code in zenml/services/utils.py
def load_last_service_from_step(
pipeline_name: str,
step_name: str,
step_context: Optional[StepContext] = None,
running: bool = False,
) -> Optional[BaseService]:
"""Get the last service created by the pipeline and step with the given names.
This function searches backwards through the execution history for a
named pipeline step and returns the first service instance that it finds
logged as a step output.
Args:
pipeline_name: the name of the pipeline
step_name: pipeline step name
step_context: step context required only when called from within a step
running: when this flag is set, the search only returns a running
service
Returns:
A BaseService instance that represents the service or None if no service
was created during the last execution of the pipeline step.
Raises:
KeyError: if the pipeline or step name is not found in the execution.
RuntimeError: if the artifact is not a service.
"""
if step_context is None:
repo = Repository()
pipeline = repo.get_pipeline(pipeline_name)
else:
pipeline = step_context.metadata_store.get_pipeline(
pipeline_name=pipeline_name
)
if pipeline is None:
raise KeyError(f"No pipeline with name `{pipeline_name}` was found")
for run in reversed(pipeline.runs):
step = run.get_step(name=step_name)
for artifact_view in step.outputs.values():
# filter out anything but service artifacts
if artifact_view.type == "ServiceArtifact":
service = artifact_view.read()
if not isinstance(service, BaseService):
raise RuntimeError(
f"Artifact `{artifact_view.name}` of type "
f"`{artifact_view.type}` is not a service"
)
if not running or service.is_running:
return service
return None