Seldon
zenml.integrations.seldon
special
Initialization of the Seldon integration.
The Seldon Core integration allows you to use the Seldon Core model serving platform to implement continuous model deployment.
SeldonIntegration (Integration)
Definition of Seldon Core integration for ZenML.
Source code in zenml/integrations/seldon/__init__.py
class SeldonIntegration(Integration):
"""Definition of Seldon Core integration for ZenML."""
NAME = SELDON
REQUIREMENTS = [
"kubernetes==18.20.0",
"seldon-core==1.14.1",
]
@classmethod
def activate(cls) -> None:
"""Activate the Seldon Core integration."""
from zenml.integrations.seldon import secret_schemas # noqa
from zenml.integrations.seldon import services # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Seldon Core.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.seldon.flavors import SeldonModelDeployerFlavor
return [SeldonModelDeployerFlavor]
activate()
classmethod
Activate the Seldon Core integration.
Source code in zenml/integrations/seldon/__init__.py
@classmethod
def activate(cls) -> None:
"""Activate the Seldon Core integration."""
from zenml.integrations.seldon import secret_schemas # noqa
from zenml.integrations.seldon import services # noqa
flavors()
classmethod
Declare the stack component flavors for the Seldon Core.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/seldon/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Seldon Core.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.seldon.flavors import SeldonModelDeployerFlavor
return [SeldonModelDeployerFlavor]
constants
Seldon constants.
custom_deployer
special
Initialization of ZenML custom deployer.
zenml_custom_model
Implements a custom model for the Seldon integration.
ZenMLCustomModel
Custom model class for ZenML and Seldon.
This class is used to implement a custom model for the Seldon Core integration, which is used as the main entry point for custom code execution.
Attributes:
Name | Type | Description |
---|---|---|
name |
The name of the model. |
|
model_uri |
The URI of the model. |
|
predict_func |
The predict function of the model. |
Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
class ZenMLCustomModel:
"""Custom model class for ZenML and Seldon.
This class is used to implement a custom model for the Seldon Core integration,
which is used as the main entry point for custom code execution.
Attributes:
name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
def __init__(
self,
model_name: str,
model_uri: str,
predict_func: str,
):
"""Initializes a ZenMLCustomModel object.
Args:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
self.name = model_name
self.model_uri = model_uri
self.predict_func = import_class_by_path(predict_func)
self.model = None
self.ready = False
def load(self) -> bool:
"""Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of
the artifact to a file at the preparing time and loading it again at the
prediction time by the materializer.
Returns:
True if the model was loaded successfully, False otherwise.
"""
try:
from zenml.utils.materializer_utils import load_model_from_metadata
self.model = load_model_from_metadata(self.model_uri)
except Exception as e:
logger.error("Failed to load model: {}".format(e))
return False
self.ready = True
return self.ready
def predict(
self,
X: Array_Like,
features_names: Optional[List[str]],
**kwargs: Any,
) -> Array_Like:
"""Predict the given request.
The main predict function of the model. This function is called by the
Seldon Core server when a request is received. Then inside this function,
the user-defined predict function is called.
Args:
X: The request to predict in a dictionary.
features_names: The names of the features.
**kwargs: Additional arguments.
Returns:
The prediction dictionary.
Raises:
Exception: If function could not be called.
NotImplementedError: If the model is not ready.
TypeError: If the request is not a dictionary.
"""
if self.predict_func is not None:
try:
prediction = {"predictions": self.predict_func(self.model, X)}
except Exception as e:
raise Exception("Failed to predict: {}".format(e))
if isinstance(prediction, dict):
return prediction
else:
raise TypeError(
f"Prediction is not a dictionary. Expected dict type but got {type(prediction)}"
)
else:
raise NotImplementedError("Predict function is not implemented")
__init__(self, model_name, model_uri, predict_func)
special
Initializes a ZenMLCustomModel object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_name |
str |
The name of the model. |
required |
model_uri |
str |
The URI of the model. |
required |
predict_func |
str |
The predict function of the model. |
required |
Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
def __init__(
self,
model_name: str,
model_uri: str,
predict_func: str,
):
"""Initializes a ZenMLCustomModel object.
Args:
model_name: The name of the model.
model_uri: The URI of the model.
predict_func: The predict function of the model.
"""
self.name = model_name
self.model_uri = model_uri
self.predict_func = import_class_by_path(predict_func)
self.model = None
self.ready = False
load(self)
Load the model.
This function loads the model into memory and sets the ready flag to True. The model is loaded using the materializer, by saving the information of the artifact to a file at the preparing time and loading it again at the prediction time by the materializer.
Returns:
Type | Description |
---|---|
bool |
True if the model was loaded successfully, False otherwise. |
Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
def load(self) -> bool:
"""Load the model.
This function loads the model into memory and sets the ready flag to True.
The model is loaded using the materializer, by saving the information of
the artifact to a file at the preparing time and loading it again at the
prediction time by the materializer.
Returns:
True if the model was loaded successfully, False otherwise.
"""
try:
from zenml.utils.materializer_utils import load_model_from_metadata
self.model = load_model_from_metadata(self.model_uri)
except Exception as e:
logger.error("Failed to load model: {}".format(e))
return False
self.ready = True
return self.ready
predict(self, X, features_names, **kwargs)
Predict the given request.
The main predict function of the model. This function is called by the Seldon Core server when a request is received. Then inside this function, the user-defined predict function is called.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
X |
Union[numpy.ndarray, List[Any], str, bytes, Dict[str, Any]] |
The request to predict in a dictionary. |
required |
features_names |
Optional[List[str]] |
The names of the features. |
required |
**kwargs |
Any |
Additional arguments. |
{} |
Returns:
Type | Description |
---|---|
Union[numpy.ndarray, List[Any], str, bytes, Dict[str, Any]] |
The prediction dictionary. |
Exceptions:
Type | Description |
---|---|
Exception |
If function could not be called. |
NotImplementedError |
If the model is not ready. |
TypeError |
If the request is not a dictionary. |
Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
def predict(
self,
X: Array_Like,
features_names: Optional[List[str]],
**kwargs: Any,
) -> Array_Like:
"""Predict the given request.
The main predict function of the model. This function is called by the
Seldon Core server when a request is received. Then inside this function,
the user-defined predict function is called.
Args:
X: The request to predict in a dictionary.
features_names: The names of the features.
**kwargs: Additional arguments.
Returns:
The prediction dictionary.
Raises:
Exception: If function could not be called.
NotImplementedError: If the model is not ready.
TypeError: If the request is not a dictionary.
"""
if self.predict_func is not None:
try:
prediction = {"predictions": self.predict_func(self.model, X)}
except Exception as e:
raise Exception("Failed to predict: {}".format(e))
if isinstance(prediction, dict):
return prediction
else:
raise TypeError(
f"Prediction is not a dictionary. Expected dict type but got {type(prediction)}"
)
else:
raise NotImplementedError("Predict function is not implemented")
flavors
special
Seldon integration flavors.
seldon_model_deployer_flavor
Seldon model deployer flavor.
SeldonModelDeployerConfig (BaseModelDeployerConfig)
pydantic-model
Config for the Seldon Model Deployer.
Attributes:
Name | Type | Description |
---|---|---|
kubernetes_context |
Optional[str] |
the Kubernetes context to use to contact the remote Seldon Core installation. If not specified, the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either a locally active context or an in-cluster Kubernetes configuration (if running inside a pod). |
kubernetes_namespace |
Optional[str] |
the Kubernetes namespace where the Seldon Core deployment servers are provisioned and managed by ZenML. If not specified, the namespace set in the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either the current namespace configured in the locally active context or the namespace in the context of which the pod is running (if running inside a pod). |
base_url |
str |
the base URL of the Kubernetes ingress used to expose the Seldon Core deployment servers. |
secret |
Optional[str] |
the name of a ZenML secret containing the credentials used by Seldon Core storage initializers to authenticate to the Artifact Store (i.e. the storage backend where models are stored - see https://docs.seldon.io/projects/seldon-core/en/latest/servers/overview.html#handling-credentials). |
Source code in zenml/integrations/seldon/flavors/seldon_model_deployer_flavor.py
class SeldonModelDeployerConfig(BaseModelDeployerConfig):
"""Config for the Seldon Model Deployer.
Attributes:
kubernetes_context: the Kubernetes context to use to contact the remote
Seldon Core installation. If not specified, the current
configuration is used. Depending on where the Seldon model deployer
is being used, this can be either a locally active context or an
in-cluster Kubernetes configuration (if running inside a pod).
kubernetes_namespace: the Kubernetes namespace where the Seldon Core
deployment servers are provisioned and managed by ZenML. If not
specified, the namespace set in the current configuration is used.
Depending on where the Seldon model deployer is being used, this can
be either the current namespace configured in the locally active
context or the namespace in the context of which the pod is running
(if running inside a pod).
base_url: the base URL of the Kubernetes ingress used to expose the
Seldon Core deployment servers.
secret: the name of a ZenML secret containing the credentials used by
Seldon Core storage initializers to authenticate to the Artifact
Store (i.e. the storage backend where models are stored - see
https://docs.seldon.io/projects/seldon-core/en/latest/servers/overview.html#handling-credentials).
"""
kubernetes_context: Optional[str] # TODO: Potential setting
kubernetes_namespace: Optional[str]
base_url: str # TODO: unused?
secret: Optional[str]
SeldonModelDeployerFlavor (BaseModelDeployerFlavor)
Seldon Core model deployer flavor.
Source code in zenml/integrations/seldon/flavors/seldon_model_deployer_flavor.py
class SeldonModelDeployerFlavor(BaseModelDeployerFlavor):
"""Seldon Core model deployer flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return SELDON_MODEL_DEPLOYER_FLAVOR
@property
def config_class(self) -> Type[SeldonModelDeployerConfig]:
"""Returns `SeldonModelDeployerConfig` config class.
Returns:
The config class.
"""
return SeldonModelDeployerConfig
@property
def implementation_class(self) -> Type["SeldonModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.seldon.model_deployers import (
SeldonModelDeployer,
)
return SeldonModelDeployer
config_class: Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig]
property
readonly
Returns SeldonModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig] |
The config class. |
implementation_class: Type[SeldonModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SeldonModelDeployer] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
model_deployers
special
Initialization of the Seldon Model Deployer.
seldon_model_deployer
Implementation of the Seldon Model Deployer.
SeldonModelDeployer (BaseModelDeployer)
Seldon Core model deployer stack component implementation.
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
class SeldonModelDeployer(BaseModelDeployer):
"""Seldon Core model deployer stack component implementation."""
NAME: ClassVar[str] = "Seldon Core"
FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = SeldonModelDeployerFlavor
_client: Optional[SeldonClient] = None
@property
def config(self) -> SeldonModelDeployerConfig:
"""Returns the `SeldonModelDeployerConfig` config.
Returns:
The configuration.
"""
return cast(SeldonModelDeployerConfig, self._config)
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "SeldonDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information that might be relevant to the user.
Args:
service_instance: Instance of a SeldonDeploymentService
Returns:
Model server information.
"""
return {
"PREDICTION_URL": service_instance.prediction_url,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"SELDON_DEPLOYMENT": service_instance.seldon_deployment_name,
}
@property
def seldon_client(self) -> SeldonClient:
"""Get the Seldon Core client associated with this model deployer.
Returns:
The Seldon Core client.
"""
if not self._client:
self._client = SeldonClient(
context=self.config.kubernetes_context,
namespace=self.config.kubernetes_namespace,
)
return self._client
@property
def kubernetes_secret_name(self) -> Optional[str]:
"""Get the Kubernetes secret name associated with this model deployer.
If a secret is configured for this model deployer, a corresponding
Kubernetes secret is created in the remote cluster to be used
by Seldon Core storage initializers to authenticate to the Artifact
Store. This method returns the unique name that is used for this secret.
Returns:
The Seldon Core Kubernetes secret name, or None if no secret is
configured.
"""
if not self.config.secret:
return None
return (
re.sub(
r"[^0-9a-zA-Z-]+",
"-",
f"zenml-seldon-core-{self.config.secret}",
)
.strip("-")
.lower()
)
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
needs_docker_image = False
for step in deployment.steps.values():
if step.config.extra.get(SELDON_CUSTOM_DEPLOYMENT, False) is True:
needs_docker_image = True
if needs_docker_image:
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(SELDON_DOCKER_IMAGE_KEY, repo_digest)
def _create_or_update_kubernetes_secret(self) -> Optional[str]:
"""Create or update a Kubernetes secret.
Uses the information stored in the ZenML secret configured for the model deployer.
Returns:
The name of the Kubernetes secret that was created or updated, or
None if no secret was configured.
Raises:
RuntimeError: if the secret cannot be created or updated.
"""
# if a ZenML secret was configured in the model deployer,
# create a Kubernetes secret as a means to pass this information
# to the Seldon Core deployment
if self.config.secret:
secret_manager = Client().active_stack.secrets_manager
if not secret_manager or not isinstance(
secret_manager, BaseSecretsManager
):
raise RuntimeError(
f"The active stack doesn't have a secret manager component. "
f"The ZenML secret specified in the Seldon Core Model "
f"Deployer configuration cannot be fetched: {self.config.secret}."
)
try:
zenml_secret = secret_manager.get_secret(self.config.secret)
except KeyError:
raise RuntimeError(
f"The ZenML secret '{self.config.secret}' specified in the "
f"Seldon Core Model Deployer configuration was not found "
f"in the active stack's secret manager."
)
# should never happen, just making mypy happy
assert self.kubernetes_secret_name is not None
self.seldon_client.create_or_update_secret(
self.kubernetes_secret_name, zenml_secret
)
return self.kubernetes_secret_name
def _delete_kubernetes_secret(self) -> None:
"""Delete the Kubernetes secret associated with this model deployer.
Do this if no Seldon Core deployments are using it.
"""
if self.kubernetes_secret_name:
# fetch all the Seldon Core deployments that currently
# configured to use this secret
services = self.find_model_server()
for service in services:
config = cast(SeldonDeploymentConfig, service.config)
if config.secret_name == self.kubernetes_secret_name:
return
self.seldon_client.delete_secret(self.kubernetes_secret_name)
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new Seldon Core deployment or update an existing one.
# noqa: DAR402
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new Seldon
Core deployment server to reflect the model and other configuration
parameters specified in the supplied Seldon deployment `config`.
* if `replace` is True, this method will first attempt to find an
existing Seldon Core deployment that is *equivalent* to the supplied
configuration parameters. Two or more Seldon Core deployments are
considered equivalent if they have the same `pipeline_name`,
`pipeline_step_name` and `model_name` configuration parameters. To
put it differently, two Seldon Core deployments are equivalent if
they serve versions of the same model deployed by the same pipeline
step. If an equivalent Seldon Core deployment is found, it will be
updated in place to reflect the new configuration parameters. This
allows an existing Seldon Core deployment to retain its prediction
URL while performing a rolling update to serve a new model version.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new Seldon Core deployment
server for each new model version. If multiple equivalent Seldon Core
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Args:
config: the configuration of the model to be deployed with Seldon.
Core
replace: set this flag to True to find and update an equivalent
Seldon Core deployment server with the new model instead of
starting a new deployment server.
timeout: the timeout in seconds to wait for the Seldon Core server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the Seldon Core
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML Seldon Core deployment service object that can be used to
interact with the remote Seldon Core server.
Raises:
SeldonClientError: if a Seldon Core client error is encountered
while provisioning the Seldon Core deployment server.
RuntimeError: if `timeout` is set to a positive value that is
exceeded while waiting for the Seldon Core deployment server
to start, or if an operational failure is encountered before
it reaches a ready state.
"""
config = cast(SeldonDeploymentConfig, config)
service = None
# if a custom Kubernetes secret is not explicitly specified in the
# SeldonDeploymentConfig, try to create one from the ZenML secret
# configured for the model deployer
config.secret_name = (
config.secret_name or self._create_or_update_kubernetes_secret()
)
# if replace is True, find equivalent Seldon Core deployments
if replace is True:
equivalent_services = self.find_model_server(
running=False,
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for equivalent_service in equivalent_services:
if service is None:
# keep the most recently created service
service = equivalent_service
else:
try:
# delete the older services and don't wait for them to
# be deprovisioned
service.stop()
except RuntimeError:
# ignore errors encountered while stopping old services
pass
if service:
# update an equivalent service in place
service.update(config)
logger.info(
f"Updating an existing Seldon deployment service: {service}"
)
else:
# create a new service
service = SeldonDeploymentService(config=config)
logger.info(f"Creating a new Seldon deployment service: {service}")
# start the service which in turn provisions the Seldon Core
# deployment server and waits for it to reach a ready state
service.start(timeout=timeout)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"is_custom_code_deployment": config.is_custom_deployment,
}
track_event(AnalyticsEvent.MODEL_DEPLOYED, metadata=metadata)
return service
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
model_type: Optional[str] = None,
) -> List[BaseService]:
"""Find one or more Seldon Core model services that match the given criteria.
The Seldon Core deployment services that meet the search criteria are
returned sorted in descending order of their creation time (i.e. more
recent deployments first).
Args:
running: if true, only running services will be returned.
service_uuid: the UUID of the Seldon Core service that was originally used
to create the Seldon Core deployment resource.
pipeline_name: name of the pipeline that the deployed model was part
of.
pipeline_run_id: ID of the pipeline run which the deployed model was
part of.
pipeline_step_name: the name of the pipeline model deployment step
that deployed the model.
model_name: the name of the deployed model.
model_uri: URI of the deployed model.
model_type: the Seldon Core server implementation used to serve
the model
Returns:
One or more Seldon Core service objects representing Seldon Core
model servers that match the input search criteria.
"""
# Use a Seldon deployment service configuration to compute the labels
config = SeldonDeploymentConfig(
pipeline_name=pipeline_name or "",
pipeline_run_id=pipeline_run_id or "",
pipeline_step_name=pipeline_step_name or "",
model_name=model_name or "",
model_uri=model_uri or "",
implementation=model_type or "",
)
labels = config.get_seldon_deployment_labels()
if service_uuid:
# the service UUID is not a label covered by the Seldon
# deployment service configuration, so we need to add it
# separately
labels["zenml.service_uuid"] = str(service_uuid)
deployments = self.seldon_client.find_deployments(labels=labels)
# sort the deployments in descending order of their creation time
deployments.sort(
key=lambda deployment: datetime.strptime(
deployment.metadata.creationTimestamp,
"%Y-%m-%dT%H:%M:%SZ",
)
if deployment.metadata.creationTimestamp
else datetime.min,
reverse=True,
)
services: List[BaseService] = []
for deployment in deployments:
# recreate the Seldon deployment service object from the Seldon
# deployment resource
service = SeldonDeploymentService.create_from_deployment(
deployment=deployment
)
if running and not service.is_running:
# skip non-running services
continue
services.append(service)
return services
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Stop a Seldon Core model server.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop.
force: if True, force the service to stop.
Raises:
NotImplementedError: stopping Seldon Core model servers is not
supported.
"""
raise NotImplementedError(
"Stopping Seldon Core model servers is not implemented. Try "
"deleting the Seldon Core model server instead."
)
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Start a Seldon Core model deployment server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to become
active. . If set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
NotImplementedError: since we don't support starting Seldon Core
model servers
"""
raise NotImplementedError(
"Starting Seldon Core model servers is not implemented"
)
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Delete a Seldon Core model deployment server.
Args:
uuid: UUID of the model server to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
return
services[0].stop(timeout=timeout, force=force)
# if this is the last Seldon Core model server, delete the Kubernetes
# secret used to store the authentication information for the Seldon
# Core model server storage initializer
self._delete_kubernetes_secret()
config: SeldonModelDeployerConfig
property
readonly
Returns the SeldonModelDeployerConfig
config.
Returns:
Type | Description |
---|---|
SeldonModelDeployerConfig |
The configuration. |
kubernetes_secret_name: Optional[str]
property
readonly
Get the Kubernetes secret name associated with this model deployer.
If a secret is configured for this model deployer, a corresponding Kubernetes secret is created in the remote cluster to be used by Seldon Core storage initializers to authenticate to the Artifact Store. This method returns the unique name that is used for this secret.
Returns:
Type | Description |
---|---|
Optional[str] |
The Seldon Core Kubernetes secret name, or None if no secret is configured. |
seldon_client: SeldonClient
property
readonly
Get the Seldon Core client associated with this model deployer.
Returns:
Type | Description |
---|---|
SeldonClient |
The Seldon Core client. |
FLAVOR (BaseModelDeployerFlavor)
Seldon Core model deployer flavor.
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
class SeldonModelDeployerFlavor(BaseModelDeployerFlavor):
"""Seldon Core model deployer flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return SELDON_MODEL_DEPLOYER_FLAVOR
@property
def config_class(self) -> Type[SeldonModelDeployerConfig]:
"""Returns `SeldonModelDeployerConfig` config class.
Returns:
The config class.
"""
return SeldonModelDeployerConfig
@property
def implementation_class(self) -> Type["SeldonModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.seldon.model_deployers import (
SeldonModelDeployer,
)
return SeldonModelDeployer
config_class: Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig]
property
readonly
Returns SeldonModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig] |
The config class. |
implementation_class: Type[SeldonModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[SeldonModelDeployer] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
delete_model_server(self, uuid, timeout=300, force=False)
Delete a Seldon Core model deployment server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to delete. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Delete a Seldon Core model deployment server.
Args:
uuid: UUID of the model server to delete.
timeout: timeout in seconds to wait for the service to stop. If
set to 0, the method will return immediately after
deprovisioning the service, without waiting for it to stop.
force: if True, force the service to stop.
"""
services = self.find_model_server(service_uuid=uuid)
if len(services) == 0:
return
services[0].stop(timeout=timeout, force=force)
# if this is the last Seldon Core model server, delete the Kubernetes
# secret used to store the authentication information for the Seldon
# Core model server storage initializer
self._delete_kubernetes_secret()
deploy_model(self, config, replace=False, timeout=300)
Create a new Seldon Core deployment or update an existing one.
noqa: DAR402
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the replace
argument value:
-
if
replace
is False, calling this method will create a new Seldon Core deployment server to reflect the model and other configuration parameters specified in the supplied Seldon deploymentconfig
. -
if
replace
is True, this method will first attempt to find an existing Seldon Core deployment that is equivalent to the supplied configuration parameters. Two or more Seldon Core deployments are considered equivalent if they have the samepipeline_name
,pipeline_step_name
andmodel_name
configuration parameters. To put it differently, two Seldon Core deployments are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent Seldon Core deployment is found, it will be updated in place to reflect the new configuration parameters. This allows an existing Seldon Core deployment to retain its prediction URL while performing a rolling update to serve a new model version.
Callers should set replace
to True if they want a continuous model
deployment workflow that doesn't spin up a new Seldon Core deployment
server for each new model version. If multiple equivalent Seldon Core
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServiceConfig |
the configuration of the model to be deployed with Seldon. Core |
required |
replace |
bool |
set this flag to True to find and update an equivalent Seldon Core deployment server with the new model instead of starting a new deployment server. |
False |
timeout |
int |
the timeout in seconds to wait for the Seldon Core server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Seldon Core server is provisioned, without waiting for it to fully start. |
300 |
Returns:
Type | Description |
---|---|
BaseService |
The ZenML Seldon Core deployment service object that can be used to interact with the remote Seldon Core server. |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if a Seldon Core client error is encountered while provisioning the Seldon Core deployment server. |
RuntimeError |
if |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new Seldon Core deployment or update an existing one.
# noqa: DAR402
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new Seldon
Core deployment server to reflect the model and other configuration
parameters specified in the supplied Seldon deployment `config`.
* if `replace` is True, this method will first attempt to find an
existing Seldon Core deployment that is *equivalent* to the supplied
configuration parameters. Two or more Seldon Core deployments are
considered equivalent if they have the same `pipeline_name`,
`pipeline_step_name` and `model_name` configuration parameters. To
put it differently, two Seldon Core deployments are equivalent if
they serve versions of the same model deployed by the same pipeline
step. If an equivalent Seldon Core deployment is found, it will be
updated in place to reflect the new configuration parameters. This
allows an existing Seldon Core deployment to retain its prediction
URL while performing a rolling update to serve a new model version.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new Seldon Core deployment
server for each new model version. If multiple equivalent Seldon Core
deployments are found, the most recently created deployment is selected
to be updated and the others are deleted.
Args:
config: the configuration of the model to be deployed with Seldon.
Core
replace: set this flag to True to find and update an equivalent
Seldon Core deployment server with the new model instead of
starting a new deployment server.
timeout: the timeout in seconds to wait for the Seldon Core server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the Seldon Core
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML Seldon Core deployment service object that can be used to
interact with the remote Seldon Core server.
Raises:
SeldonClientError: if a Seldon Core client error is encountered
while provisioning the Seldon Core deployment server.
RuntimeError: if `timeout` is set to a positive value that is
exceeded while waiting for the Seldon Core deployment server
to start, or if an operational failure is encountered before
it reaches a ready state.
"""
config = cast(SeldonDeploymentConfig, config)
service = None
# if a custom Kubernetes secret is not explicitly specified in the
# SeldonDeploymentConfig, try to create one from the ZenML secret
# configured for the model deployer
config.secret_name = (
config.secret_name or self._create_or_update_kubernetes_secret()
)
# if replace is True, find equivalent Seldon Core deployments
if replace is True:
equivalent_services = self.find_model_server(
running=False,
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for equivalent_service in equivalent_services:
if service is None:
# keep the most recently created service
service = equivalent_service
else:
try:
# delete the older services and don't wait for them to
# be deprovisioned
service.stop()
except RuntimeError:
# ignore errors encountered while stopping old services
pass
if service:
# update an equivalent service in place
service.update(config)
logger.info(
f"Updating an existing Seldon deployment service: {service}"
)
else:
# create a new service
service = SeldonDeploymentService(config=config)
logger.info(f"Creating a new Seldon deployment service: {service}")
# start the service which in turn provisions the Seldon Core
# deployment server and waits for it to reach a ready state
service.start(timeout=timeout)
# Add telemetry with metadata that gets the stack metadata and
# differentiates between pure model and custom code deployments
stack = Client().active_stack
stack_metadata = {
component_type.value: component.flavor
for component_type, component in stack.components.items()
}
metadata = {
"store_type": Client().zen_store.type.value,
**stack_metadata,
"is_custom_code_deployment": config.is_custom_deployment,
}
track_event(AnalyticsEvent.MODEL_DEPLOYED, metadata=metadata)
return service
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, pipeline_run_id=None, pipeline_step_name=None, model_name=None, model_uri=None, model_type=None)
Find one or more Seldon Core model services that match the given criteria.
The Seldon Core deployment services that meet the search criteria are returned sorted in descending order of their creation time (i.e. more recent deployments first).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
running |
bool |
if true, only running services will be returned. |
False |
service_uuid |
Optional[uuid.UUID] |
the UUID of the Seldon Core service that was originally used to create the Seldon Core deployment resource. |
None |
pipeline_name |
Optional[str] |
name of the pipeline that the deployed model was part of. |
None |
pipeline_run_id |
Optional[str] |
ID of the pipeline run which the deployed model was part of. |
None |
pipeline_step_name |
Optional[str] |
the name of the pipeline model deployment step that deployed the model. |
None |
model_name |
Optional[str] |
the name of the deployed model. |
None |
model_uri |
Optional[str] |
URI of the deployed model. |
None |
model_type |
Optional[str] |
the Seldon Core server implementation used to serve the model |
None |
Returns:
Type | Description |
---|---|
List[zenml.services.service.BaseService] |
One or more Seldon Core service objects representing Seldon Core model servers that match the input search criteria. |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
model_type: Optional[str] = None,
) -> List[BaseService]:
"""Find one or more Seldon Core model services that match the given criteria.
The Seldon Core deployment services that meet the search criteria are
returned sorted in descending order of their creation time (i.e. more
recent deployments first).
Args:
running: if true, only running services will be returned.
service_uuid: the UUID of the Seldon Core service that was originally used
to create the Seldon Core deployment resource.
pipeline_name: name of the pipeline that the deployed model was part
of.
pipeline_run_id: ID of the pipeline run which the deployed model was
part of.
pipeline_step_name: the name of the pipeline model deployment step
that deployed the model.
model_name: the name of the deployed model.
model_uri: URI of the deployed model.
model_type: the Seldon Core server implementation used to serve
the model
Returns:
One or more Seldon Core service objects representing Seldon Core
model servers that match the input search criteria.
"""
# Use a Seldon deployment service configuration to compute the labels
config = SeldonDeploymentConfig(
pipeline_name=pipeline_name or "",
pipeline_run_id=pipeline_run_id or "",
pipeline_step_name=pipeline_step_name or "",
model_name=model_name or "",
model_uri=model_uri or "",
implementation=model_type or "",
)
labels = config.get_seldon_deployment_labels()
if service_uuid:
# the service UUID is not a label covered by the Seldon
# deployment service configuration, so we need to add it
# separately
labels["zenml.service_uuid"] = str(service_uuid)
deployments = self.seldon_client.find_deployments(labels=labels)
# sort the deployments in descending order of their creation time
deployments.sort(
key=lambda deployment: datetime.strptime(
deployment.metadata.creationTimestamp,
"%Y-%m-%dT%H:%M:%SZ",
)
if deployment.metadata.creationTimestamp
else datetime.min,
reverse=True,
)
services: List[BaseService] = []
for deployment in deployments:
# recreate the Seldon deployment service object from the Seldon
# deployment resource
service = SeldonDeploymentService.create_from_deployment(
deployment=deployment
)
if running and not service.is_running:
# skip non-running services
continue
services.append(service)
return services
get_model_server_info(service_instance)
staticmethod
Return implementation specific information that might be relevant to the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_instance |
SeldonDeploymentService |
Instance of a SeldonDeploymentService |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
Model server information. |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "SeldonDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information that might be relevant to the user.
Args:
service_instance: Instance of a SeldonDeploymentService
Returns:
Model server information.
"""
return {
"PREDICTION_URL": service_instance.prediction_url,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"SELDON_DEPLOYMENT": service_instance.seldon_deployment_name,
}
prepare_pipeline_deployment(self, deployment, stack)
Build a Docker image and push it to the container registry.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
PipelineDeployment |
The pipeline deployment configuration. |
required |
stack |
Stack |
The stack on which the pipeline will be deployed. |
required |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def prepare_pipeline_deployment(
self,
deployment: "PipelineDeployment",
stack: "Stack",
) -> None:
"""Build a Docker image and push it to the container registry.
Args:
deployment: The pipeline deployment configuration.
stack: The stack on which the pipeline will be deployed.
"""
needs_docker_image = False
for step in deployment.steps.values():
if step.config.extra.get(SELDON_CUSTOM_DEPLOYMENT, False) is True:
needs_docker_image = True
if needs_docker_image:
docker_image_builder = PipelineDockerImageBuilder()
repo_digest = docker_image_builder.build_and_push_docker_image(
deployment=deployment, stack=stack
)
deployment.add_extra(SELDON_DOCKER_IMAGE_KEY, repo_digest)
start_model_server(self, uuid, timeout=300)
Start a Seldon Core model deployment server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to start. |
required |
timeout |
int |
timeout in seconds to wait for the service to become active. . If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active. |
300 |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
since we don't support starting Seldon Core model servers |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def start_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
"""Start a Seldon Core model deployment server.
Args:
uuid: UUID of the model server to start.
timeout: timeout in seconds to wait for the service to become
active. . If set to 0, the method will return immediately after
provisioning the service, without waiting for it to become
active.
Raises:
NotImplementedError: since we don't support starting Seldon Core
model servers
"""
raise NotImplementedError(
"Starting Seldon Core model servers is not implemented"
)
stop_model_server(self, uuid, timeout=300, force=False)
Stop a Seldon Core model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to stop. |
required |
timeout |
int |
timeout in seconds to wait for the service to stop. |
300 |
force |
bool |
if True, force the service to stop. |
False |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
stopping Seldon Core model servers is not supported. |
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Stop a Seldon Core model server.
Args:
uuid: UUID of the model server to stop.
timeout: timeout in seconds to wait for the service to stop.
force: if True, force the service to stop.
Raises:
NotImplementedError: stopping Seldon Core model servers is not
supported.
"""
raise NotImplementedError(
"Stopping Seldon Core model servers is not implemented. Try "
"deleting the Seldon Core model server instead."
)
secret_schemas
special
Initialization for the Seldon secret schemas.
These are secret schemas that can be used to authenticate Seldon to the Artifact Store used to store served ML models.
secret_schemas
Implementation for Seldon secret schemas.
SeldonAzureSecretSchema (BaseSecretSchema)
pydantic-model
Seldon Azure Blob Storage credentials.
Based on: https://rclone.org/azureblob/
Attributes:
Name | Type | Description |
---|---|---|
rclone_config_azureblob_type |
Literal['azureblob'] |
the rclone config type. Must be set to "azureblob" for this schema. |
rclone_config_azureblob_account |
Optional[str] |
storage Account Name. Leave blank to use SAS URL or MSI. |
rclone_config_azureblob_key |
Optional[str] |
storage Account Key. Leave blank to use SAS URL or MSI. |
rclone_config_azureblob_sas_url |
Optional[str] |
SAS URL for container level access only. Leave blank if using account/key or MSI. |
rclone_config_azureblob_use_msi |
bool |
use a managed service identity to authenticate (only works in Azure). |
Source code in zenml/integrations/seldon/secret_schemas/secret_schemas.py
class SeldonAzureSecretSchema(BaseSecretSchema):
"""Seldon Azure Blob Storage credentials.
Based on: https://rclone.org/azureblob/
Attributes:
rclone_config_azureblob_type: the rclone config type. Must be set to
"azureblob" for this schema.
rclone_config_azureblob_account: storage Account Name. Leave blank to
use SAS URL or MSI.
rclone_config_azureblob_key: storage Account Key. Leave blank to
use SAS URL or MSI.
rclone_config_azureblob_sas_url: SAS URL for container level access
only. Leave blank if using account/key or MSI.
rclone_config_azureblob_use_msi: use a managed service identity to
authenticate (only works in Azure).
"""
TYPE: ClassVar[str] = SELDON_AZUREBLOB_SECRET_SCHEMA_TYPE
rclone_config_azureblob_type: Literal["azureblob"] = "azureblob"
rclone_config_azureblob_account: Optional[str]
rclone_config_azureblob_key: Optional[str]
rclone_config_azureblob_sas_url: Optional[str]
rclone_config_azureblob_use_msi: bool = False
SeldonGSSecretSchema (BaseSecretSchema)
pydantic-model
Seldon GCS credentials.
Based on: https://rclone.org/googlecloudstorage/
Attributes:
Name | Type | Description |
---|---|---|
rclone_config_gs_type |
Literal['google cloud storage'] |
the rclone config type. Must be set to "google cloud storage" for this schema. |
rclone_config_gs_client_id |
Optional[str] |
OAuth client id. |
rclone_config_gs_client_secret |
Optional[str] |
OAuth client secret. |
rclone_config_gs_token |
Optional[str] |
OAuth Access Token as a JSON blob. |
rclone_config_gs_project_number |
Optional[str] |
project number. |
rclone_config_gs_service_account_credentials |
Optional[str] |
service account credentials JSON blob. |
rclone_config_gs_anonymous |
bool |
access public buckets and objects without credentials. Set to True if you just want to download files and don't configure credentials. |
rclone_config_gs_auth_url |
Optional[str] |
auth server URL. |
Source code in zenml/integrations/seldon/secret_schemas/secret_schemas.py
class SeldonGSSecretSchema(BaseSecretSchema):
"""Seldon GCS credentials.
Based on: https://rclone.org/googlecloudstorage/
Attributes:
rclone_config_gs_type: the rclone config type. Must be set to "google
cloud storage" for this schema.
rclone_config_gs_client_id: OAuth client id.
rclone_config_gs_client_secret: OAuth client secret.
rclone_config_gs_token: OAuth Access Token as a JSON blob.
rclone_config_gs_project_number: project number.
rclone_config_gs_service_account_credentials: service account
credentials JSON blob.
rclone_config_gs_anonymous: access public buckets and objects without
credentials. Set to True if you just want to download files and
don't configure credentials.
rclone_config_gs_auth_url: auth server URL.
"""
TYPE: ClassVar[str] = SELDON_GS_SECRET_SCHEMA_TYPE
rclone_config_gs_type: Literal[
"google cloud storage"
] = "google cloud storage"
rclone_config_gs_client_id: Optional[str]
rclone_config_gs_client_secret: Optional[str]
rclone_config_gs_project_number: Optional[str]
rclone_config_gs_service_account_credentials: Optional[str]
rclone_config_gs_anonymous: bool = False
rclone_config_gs_token: Optional[str]
rclone_config_gs_auth_url: Optional[str]
rclone_config_gs_token_url: Optional[str]
SeldonS3SecretSchema (BaseSecretSchema)
pydantic-model
Seldon S3 credentials.
Based on: https://rclone.org/s3/#amazon-s3
Attributes:
Name | Type | Description |
---|---|---|
rclone_config_s3_type |
Literal['s3'] |
the rclone config type. Must be set to "s3" for this schema. |
rclone_config_s3_provider |
str |
the S3 provider (e.g. aws, ceph, minio). |
rclone_config_s3_env_auth |
bool |
get AWS credentials from EC2/ECS meta data (i.e. with IAM roles configuration). Only applies if access_key_id and secret_access_key are blank. |
rclone_config_s3_access_key_id |
Optional[str] |
AWS Access Key ID. |
rclone_config_s3_secret_access_key |
Optional[str] |
AWS Secret Access Key. |
rclone_config_s3_session_token |
Optional[str] |
AWS Session Token. |
rclone_config_s3_region |
Optional[str] |
region to connect to. |
rclone_config_s3_endpoint |
Optional[str] |
S3 API endpoint. |
Source code in zenml/integrations/seldon/secret_schemas/secret_schemas.py
class SeldonS3SecretSchema(BaseSecretSchema):
"""Seldon S3 credentials.
Based on: https://rclone.org/s3/#amazon-s3
Attributes:
rclone_config_s3_type: the rclone config type. Must be set to "s3" for
this schema.
rclone_config_s3_provider: the S3 provider (e.g. aws, ceph, minio).
rclone_config_s3_env_auth: get AWS credentials from EC2/ECS meta data
(i.e. with IAM roles configuration). Only applies if access_key_id
and secret_access_key are blank.
rclone_config_s3_access_key_id: AWS Access Key ID.
rclone_config_s3_secret_access_key: AWS Secret Access Key.
rclone_config_s3_session_token: AWS Session Token.
rclone_config_s3_region: region to connect to.
rclone_config_s3_endpoint: S3 API endpoint.
"""
TYPE: ClassVar[str] = SELDON_S3_SECRET_SCHEMA_TYPE
rclone_config_s3_type: Literal["s3"] = "s3"
rclone_config_s3_provider: str = "aws"
rclone_config_s3_env_auth: bool = False
rclone_config_s3_access_key_id: Optional[str]
rclone_config_s3_secret_access_key: Optional[str]
rclone_config_s3_session_token: Optional[str]
rclone_config_s3_region: Optional[str]
rclone_config_s3_endpoint: Optional[str]
seldon_client
Implementation of the Seldon client for ZenML.
SeldonClient
A client for interacting with Seldon Deployments.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonClient:
"""A client for interacting with Seldon Deployments."""
def __init__(self, context: Optional[str], namespace: Optional[str]):
"""Initialize a Seldon Core client.
Args:
context: the Kubernetes context to use.
namespace: the Kubernetes namespace to use.
"""
self._context = context
self._namespace = namespace
self._initialize_k8s_clients()
def _initialize_k8s_clients(self) -> None:
"""Initialize the Kubernetes clients.
Raises:
SeldonClientError: if Kubernetes configuration could not be loaded
"""
try:
k8s_config.load_incluster_config()
if not self._namespace:
# load the namespace in the context of which the
# current pod is running
self._namespace = open(
"/var/run/secrets/kubernetes.io/serviceaccount/namespace"
).read()
except k8s_config.config_exception.ConfigException:
if not self._namespace:
raise SeldonClientError(
"The Kubernetes namespace must be explicitly "
"configured when running outside of a cluster."
)
try:
k8s_config.load_kube_config(
context=self._context, persist_config=False
)
except k8s_config.config_exception.ConfigException as e:
raise SeldonClientError(
"Could not load the Kubernetes configuration"
) from e
self._core_api = k8s_client.CoreV1Api()
self._custom_objects_api = k8s_client.CustomObjectsApi()
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
"""Update the label values to be valid Kubernetes labels.
See:
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
labels: the labels to sanitize.
"""
for key, value in labels.items():
# Kubernetes labels must be alphanumeric, no longer than
# 63 characters, and must begin and end with an alphanumeric
# character ([a-z0-9A-Z])
labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
"-_."
)
@property
def namespace(self) -> str:
"""Returns the Kubernetes namespace in use by the client.
Returns:
The Kubernetes namespace in use by the client.
Raises:
RuntimeError: if the namespace has not been configured.
"""
if not self._namespace:
# shouldn't happen if the client is initialized, but we need to
# appease the mypy type checker
raise RuntimeError("The Kubernetes namespace is not configured")
return self._namespace
def create_deployment(
self,
deployment: SeldonDeployment,
poll_timeout: int = 0,
) -> SeldonDeployment:
"""Create a Seldon Core deployment resource.
Args:
deployment: the Seldon Core deployment resource to create
poll_timeout: the maximum time to wait for the deployment to become
available or to fail. If set to 0, the function will return
immediately without checking the deployment status. If a timeout
occurs and the deployment is still pending creation, it will
be returned anyway and no exception will be raised.
Returns:
the created Seldon Core deployment resource with updated status.
Raises:
SeldonDeploymentExistsError: if a deployment with the same name
already exists.
SeldonClientError: if an unknown error occurs during the creation of
the deployment.
"""
try:
logger.debug(f"Creating SeldonDeployment resource: {deployment}")
# mark the deployment as managed by ZenML, to differentiate
# between deployments that are created by ZenML and those that
# are not
deployment.mark_as_managed_by_zenml()
body_deploy = deployment.dict(exclude_none=True)
response = self._custom_objects_api.create_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
body=body_deploy,
_request_timeout=poll_timeout or None,
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when creating SeldonDeployment resource: %s", str(e)
)
if e.status == 409:
raise SeldonDeploymentExistsError(
f"A deployment with the name {deployment.name} "
f"already exists in namespace {self._namespace}"
)
raise SeldonClientError(
"Exception when creating SeldonDeployment resource"
) from e
created_deployment = self.get_deployment(name=deployment.name)
while poll_timeout > 0 and created_deployment.is_pending():
time.sleep(5)
poll_timeout -= 5
created_deployment = self.get_deployment(name=deployment.name)
return created_deployment
def delete_deployment(
self,
name: str,
force: bool = False,
poll_timeout: int = 0,
) -> None:
"""Delete a Seldon Core deployment resource managed by ZenML.
Args:
name: the name of the Seldon Core deployment resource to delete.
force: if True, the deployment deletion will be forced (the graceful
period will be set to zero).
poll_timeout: the maximum time to wait for the deployment to be
deleted. If set to 0, the function will return immediately
without checking the deployment status. If a timeout
occurs and the deployment still exists, this method will
return and no exception will be raised.
Raises:
SeldonClientError: if an unknown error occurs during the deployment
removal.
"""
try:
logger.debug(f"Deleting SeldonDeployment resource: {name}")
# call `get_deployment` to check that the deployment exists
# and is managed by ZenML. It will raise
# a SeldonDeploymentNotFoundError otherwise
self.get_deployment(name=name)
response = self._custom_objects_api.delete_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=name,
_request_timeout=poll_timeout or None,
grace_period_seconds=0 if force else None,
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when deleting SeldonDeployment resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Exception when deleting SeldonDeployment resource {name}"
) from e
while poll_timeout > 0:
try:
self.get_deployment(name=name)
except SeldonDeploymentNotFoundError:
return
time.sleep(5)
poll_timeout -= 5
def update_deployment(
self,
deployment: SeldonDeployment,
poll_timeout: int = 0,
) -> SeldonDeployment:
"""Update a Seldon Core deployment resource.
Args:
deployment: the Seldon Core deployment resource to update
poll_timeout: the maximum time to wait for the deployment to become
available or to fail. If set to 0, the function will return
immediately without checking the deployment status. If a timeout
occurs and the deployment is still pending creation, it will
be returned anyway and no exception will be raised.
Returns:
the updated Seldon Core deployment resource with updated status.
Raises:
SeldonClientError: if an unknown error occurs while updating the
deployment.
"""
try:
logger.debug(
f"Updating SeldonDeployment resource: {deployment.name}"
)
# mark the deployment as managed by ZenML, to differentiate
# between deployments that are created by ZenML and those that
# are not
deployment.mark_as_managed_by_zenml()
# call `get_deployment` to check that the deployment exists
# and is managed by ZenML. It will raise
# a SeldonDeploymentNotFoundError otherwise
self.get_deployment(name=deployment.name)
response = self._custom_objects_api.patch_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=deployment.name,
body=deployment.dict(exclude_none=True),
_request_timeout=poll_timeout or None,
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when updating SeldonDeployment resource: %s", str(e)
)
raise SeldonClientError(
"Exception when creating SeldonDeployment resource"
) from e
updated_deployment = self.get_deployment(name=deployment.name)
while poll_timeout > 0 and updated_deployment.is_pending():
time.sleep(5)
poll_timeout -= 5
updated_deployment = self.get_deployment(name=deployment.name)
return updated_deployment
def get_deployment(self, name: str) -> SeldonDeployment:
"""Get a ZenML managed Seldon Core deployment resource by name.
Args:
name: the name of the Seldon Core deployment resource to fetch.
Returns:
The Seldon Core deployment resource.
Raises:
SeldonDeploymentNotFoundError: if the deployment resource cannot
be found or is not managed by ZenML.
SeldonClientError: if an unknown error occurs while fetching
the deployment.
"""
try:
logger.debug(f"Retrieving SeldonDeployment resource: {name}")
response = self._custom_objects_api.get_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=name,
)
logger.debug("Seldon Core API response: %s", response)
try:
deployment = SeldonDeployment(**response)
except ValidationError as e:
logger.error(
"Invalid Seldon Core deployment resource: %s\n%s",
str(e),
str(response),
)
raise SeldonDeploymentNotFoundError(
f"SeldonDeployment resource {name} could not be parsed"
)
# Only Seldon deployments managed by ZenML are returned
if not deployment.is_managed_by_zenml():
raise SeldonDeploymentNotFoundError(
f"Seldon Deployment {name} is not managed by ZenML"
)
return deployment
except k8s_client.rest.ApiException as e:
if e.status == 404:
raise SeldonDeploymentNotFoundError(
f"SeldonDeployment resource not found: {name}"
) from e
logger.error(
"Exception when fetching SeldonDeployment resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Unexpected exception when fetching SeldonDeployment "
f"resource: {name}"
) from e
def find_deployments(
self,
name: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
fields: Optional[Dict[str, str]] = None,
) -> List[SeldonDeployment]:
"""Find all ZenML-managed Seldon Core deployment resources matching the given criteria.
Args:
name: optional name of the deployment resource to find.
fields: optional selector to restrict the list of returned
Seldon deployments by their fields. Defaults to everything.
labels: optional selector to restrict the list of returned
Seldon deployments by their labels. Defaults to everything.
Returns:
List of Seldon Core deployments that match the given criteria.
Raises:
SeldonClientError: if an unknown error occurs while fetching
the deployments.
"""
fields = fields or {}
labels = labels or {}
# always filter results to only include Seldon deployments managed
# by ZenML
labels["app"] = "zenml"
if name:
fields = {"metadata.name": name}
field_selector = (
",".join(f"{k}={v}" for k, v in fields.items()) if fields else None
)
label_selector = (
",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
)
try:
logger.debug(
f"Searching SeldonDeployment resources with label selector "
f"'{labels or ''}' and field selector '{fields or ''}'"
)
response = self._custom_objects_api.list_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
field_selector=field_selector,
label_selector=label_selector,
)
logger.debug(
"Seldon Core API returned %s items", len(response["items"])
)
deployments = []
for item in response.get("items") or []:
try:
deployments.append(SeldonDeployment(**item))
except ValidationError as e:
logger.error(
"Invalid Seldon Core deployment resource: %s\n%s",
str(e),
str(item),
)
return deployments
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when searching SeldonDeployment resources with "
"label selector '%s' and field selector '%s': %s",
label_selector or "",
field_selector or "",
)
raise SeldonClientError(
f"Unexpected exception when searching SeldonDeployment "
f"with labels '{labels or ''}' and field '{fields or ''}'"
) from e
def get_deployment_logs(
self,
name: str,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core deployment resource.
Args:
name: the name of the Seldon Core deployment to get logs for.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
Yields:
The next log line.
Raises:
SeldonClientError: if an unknown error occurs while fetching
the logs.
"""
logger.debug(f"Retrieving logs for SeldonDeployment resource: {name}")
try:
response = self._core_api.list_namespaced_pod(
namespace=self._namespace,
label_selector=f"seldon-deployment-id={name}",
)
logger.debug("Kubernetes API response: %s", response)
pods = response.items
if not pods:
raise SeldonClientError(
f"The Seldon Core deployment {name} is not currently "
f"running: no Kubernetes pods associated with it were found"
)
pod = pods[0]
pod_name = pod.metadata.name
containers = [c.name for c in pod.spec.containers]
init_containers = [c.name for c in pod.spec.init_containers]
container_statuses = {
c.name: c.started or c.restart_count
for c in pod.status.container_statuses
}
container = "default"
if container not in containers:
container = containers[0]
# some containers might not be running yet and have no logs to show,
# so we need to filter them out
if not container_statuses[container]:
container = init_containers[0]
logger.info(
f"Retrieving logs for pod: `{pod_name}` and container "
f"`{container}` in namespace `{self._namespace}`"
)
response = self._core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self._namespace,
container=container,
follow=follow,
tail_lines=tail,
_preload_content=False,
)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when fetching logs for SeldonDeployment resource "
"%s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Unexpected exception when fetching logs for SeldonDeployment "
f"resource: {name}"
) from e
try:
while True:
line = response.readline().decode("utf-8").rstrip("\n")
if not line:
return
stop = yield line
if stop:
return
finally:
response.release_conn()
def create_or_update_secret(
self,
name: str,
secret: BaseSecretSchema,
) -> None:
"""Create or update a Kubernetes Secret resource.
Uses the information contained in a ZenML secret.
Args:
name: the name of the Secret resource to create.
secret: a ZenML secret with key-values that should be
stored in the Secret resource.
Raises:
SeldonClientError: if an unknown error occurs during the creation of
the secret.
k8s_client.rest.ApiException: unexpected error.
"""
try:
logger.debug(f"Creating Secret resource: {name}")
secret_data = {
k.upper(): base64.b64encode(str(v).encode("utf-8")).decode(
"ascii"
)
for k, v in secret.content.items()
if v is not None
}
secret = k8s_client.V1Secret(
metadata=k8s_client.V1ObjectMeta(
name=name,
labels={"app": "zenml"},
),
type="Opaque",
data=secret_data,
)
try:
# check if the secret is already present
self._core_api.read_namespaced_secret(
name=name,
namespace=self._namespace,
)
# if we got this far, the secret is already present, update it
# in place
response = self._core_api.replace_namespaced_secret(
name=name,
namespace=self._namespace,
body=secret,
)
except k8s_client.rest.ApiException as e:
if e.status != 404:
# if an error other than 404 is raised here, treat it
# as an unexpected error
raise
response = self._core_api.create_namespaced_secret(
namespace=self._namespace,
body=secret,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error("Exception when creating Secret resource: %s", str(e))
raise SeldonClientError(
"Exception when creating Secret resource"
) from e
def delete_secret(
self,
name: str,
) -> None:
"""Delete a Kubernetes Secret resource managed by ZenML.
Args:
name: the name of the Kubernetes Secret resource to delete.
Raises:
SeldonClientError: if an unknown error occurs during the removal
of the secret.
"""
try:
logger.debug(f"Deleting Secret resource: {name}")
response = self._core_api.delete_namespaced_secret(
name=name,
namespace=self._namespace,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
if e.status == 404:
# the secret is no longer present, nothing to do
return
logger.error(
"Exception when deleting Secret resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Exception when deleting Secret resource {name}"
) from e
namespace: str
property
readonly
Returns the Kubernetes namespace in use by the client.
Returns:
Type | Description |
---|---|
str |
The Kubernetes namespace in use by the client. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
if the namespace has not been configured. |
__init__(self, context, namespace)
special
Initialize a Seldon Core client.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
context |
Optional[str] |
the Kubernetes context to use. |
required |
namespace |
Optional[str] |
the Kubernetes namespace to use. |
required |
Source code in zenml/integrations/seldon/seldon_client.py
def __init__(self, context: Optional[str], namespace: Optional[str]):
"""Initialize a Seldon Core client.
Args:
context: the Kubernetes context to use.
namespace: the Kubernetes namespace to use.
"""
self._context = context
self._namespace = namespace
self._initialize_k8s_clients()
create_deployment(self, deployment, poll_timeout=0)
Create a Seldon Core deployment resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
SeldonDeployment |
the Seldon Core deployment resource to create |
required |
poll_timeout |
int |
the maximum time to wait for the deployment to become available or to fail. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment is still pending creation, it will be returned anyway and no exception will be raised. |
0 |
Returns:
Type | Description |
---|---|
SeldonDeployment |
the created Seldon Core deployment resource with updated status. |
Exceptions:
Type | Description |
---|---|
SeldonDeploymentExistsError |
if a deployment with the same name already exists. |
SeldonClientError |
if an unknown error occurs during the creation of the deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
def create_deployment(
self,
deployment: SeldonDeployment,
poll_timeout: int = 0,
) -> SeldonDeployment:
"""Create a Seldon Core deployment resource.
Args:
deployment: the Seldon Core deployment resource to create
poll_timeout: the maximum time to wait for the deployment to become
available or to fail. If set to 0, the function will return
immediately without checking the deployment status. If a timeout
occurs and the deployment is still pending creation, it will
be returned anyway and no exception will be raised.
Returns:
the created Seldon Core deployment resource with updated status.
Raises:
SeldonDeploymentExistsError: if a deployment with the same name
already exists.
SeldonClientError: if an unknown error occurs during the creation of
the deployment.
"""
try:
logger.debug(f"Creating SeldonDeployment resource: {deployment}")
# mark the deployment as managed by ZenML, to differentiate
# between deployments that are created by ZenML and those that
# are not
deployment.mark_as_managed_by_zenml()
body_deploy = deployment.dict(exclude_none=True)
response = self._custom_objects_api.create_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
body=body_deploy,
_request_timeout=poll_timeout or None,
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when creating SeldonDeployment resource: %s", str(e)
)
if e.status == 409:
raise SeldonDeploymentExistsError(
f"A deployment with the name {deployment.name} "
f"already exists in namespace {self._namespace}"
)
raise SeldonClientError(
"Exception when creating SeldonDeployment resource"
) from e
created_deployment = self.get_deployment(name=deployment.name)
while poll_timeout > 0 and created_deployment.is_pending():
time.sleep(5)
poll_timeout -= 5
created_deployment = self.get_deployment(name=deployment.name)
return created_deployment
create_or_update_secret(self, name, secret)
Create or update a Kubernetes Secret resource.
Uses the information contained in a ZenML secret.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Secret resource to create. |
required |
secret |
BaseSecretSchema |
a ZenML secret with key-values that should be stored in the Secret resource. |
required |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs during the creation of the secret. |
k8s_client.rest.ApiException |
unexpected error. |
Source code in zenml/integrations/seldon/seldon_client.py
def create_or_update_secret(
self,
name: str,
secret: BaseSecretSchema,
) -> None:
"""Create or update a Kubernetes Secret resource.
Uses the information contained in a ZenML secret.
Args:
name: the name of the Secret resource to create.
secret: a ZenML secret with key-values that should be
stored in the Secret resource.
Raises:
SeldonClientError: if an unknown error occurs during the creation of
the secret.
k8s_client.rest.ApiException: unexpected error.
"""
try:
logger.debug(f"Creating Secret resource: {name}")
secret_data = {
k.upper(): base64.b64encode(str(v).encode("utf-8")).decode(
"ascii"
)
for k, v in secret.content.items()
if v is not None
}
secret = k8s_client.V1Secret(
metadata=k8s_client.V1ObjectMeta(
name=name,
labels={"app": "zenml"},
),
type="Opaque",
data=secret_data,
)
try:
# check if the secret is already present
self._core_api.read_namespaced_secret(
name=name,
namespace=self._namespace,
)
# if we got this far, the secret is already present, update it
# in place
response = self._core_api.replace_namespaced_secret(
name=name,
namespace=self._namespace,
body=secret,
)
except k8s_client.rest.ApiException as e:
if e.status != 404:
# if an error other than 404 is raised here, treat it
# as an unexpected error
raise
response = self._core_api.create_namespaced_secret(
namespace=self._namespace,
body=secret,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error("Exception when creating Secret resource: %s", str(e))
raise SeldonClientError(
"Exception when creating Secret resource"
) from e
delete_deployment(self, name, force=False, poll_timeout=0)
Delete a Seldon Core deployment resource managed by ZenML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Seldon Core deployment resource to delete. |
required |
force |
bool |
if True, the deployment deletion will be forced (the graceful period will be set to zero). |
False |
poll_timeout |
int |
the maximum time to wait for the deployment to be deleted. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment still exists, this method will return and no exception will be raised. |
0 |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs during the deployment removal. |
Source code in zenml/integrations/seldon/seldon_client.py
def delete_deployment(
self,
name: str,
force: bool = False,
poll_timeout: int = 0,
) -> None:
"""Delete a Seldon Core deployment resource managed by ZenML.
Args:
name: the name of the Seldon Core deployment resource to delete.
force: if True, the deployment deletion will be forced (the graceful
period will be set to zero).
poll_timeout: the maximum time to wait for the deployment to be
deleted. If set to 0, the function will return immediately
without checking the deployment status. If a timeout
occurs and the deployment still exists, this method will
return and no exception will be raised.
Raises:
SeldonClientError: if an unknown error occurs during the deployment
removal.
"""
try:
logger.debug(f"Deleting SeldonDeployment resource: {name}")
# call `get_deployment` to check that the deployment exists
# and is managed by ZenML. It will raise
# a SeldonDeploymentNotFoundError otherwise
self.get_deployment(name=name)
response = self._custom_objects_api.delete_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=name,
_request_timeout=poll_timeout or None,
grace_period_seconds=0 if force else None,
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when deleting SeldonDeployment resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Exception when deleting SeldonDeployment resource {name}"
) from e
while poll_timeout > 0:
try:
self.get_deployment(name=name)
except SeldonDeploymentNotFoundError:
return
time.sleep(5)
poll_timeout -= 5
delete_secret(self, name)
Delete a Kubernetes Secret resource managed by ZenML.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Kubernetes Secret resource to delete. |
required |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs during the removal of the secret. |
Source code in zenml/integrations/seldon/seldon_client.py
def delete_secret(
self,
name: str,
) -> None:
"""Delete a Kubernetes Secret resource managed by ZenML.
Args:
name: the name of the Kubernetes Secret resource to delete.
Raises:
SeldonClientError: if an unknown error occurs during the removal
of the secret.
"""
try:
logger.debug(f"Deleting Secret resource: {name}")
response = self._core_api.delete_namespaced_secret(
name=name,
namespace=self._namespace,
)
logger.debug("Kubernetes API response: %s", response)
except k8s_client.rest.ApiException as e:
if e.status == 404:
# the secret is no longer present, nothing to do
return
logger.error(
"Exception when deleting Secret resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Exception when deleting Secret resource {name}"
) from e
find_deployments(self, name=None, labels=None, fields=None)
Find all ZenML-managed Seldon Core deployment resources matching the given criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Optional[str] |
optional name of the deployment resource to find. |
None |
fields |
Optional[Dict[str, str]] |
optional selector to restrict the list of returned Seldon deployments by their fields. Defaults to everything. |
None |
labels |
Optional[Dict[str, str]] |
optional selector to restrict the list of returned Seldon deployments by their labels. Defaults to everything. |
None |
Returns:
Type | Description |
---|---|
List[zenml.integrations.seldon.seldon_client.SeldonDeployment] |
List of Seldon Core deployments that match the given criteria. |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs while fetching the deployments. |
Source code in zenml/integrations/seldon/seldon_client.py
def find_deployments(
self,
name: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
fields: Optional[Dict[str, str]] = None,
) -> List[SeldonDeployment]:
"""Find all ZenML-managed Seldon Core deployment resources matching the given criteria.
Args:
name: optional name of the deployment resource to find.
fields: optional selector to restrict the list of returned
Seldon deployments by their fields. Defaults to everything.
labels: optional selector to restrict the list of returned
Seldon deployments by their labels. Defaults to everything.
Returns:
List of Seldon Core deployments that match the given criteria.
Raises:
SeldonClientError: if an unknown error occurs while fetching
the deployments.
"""
fields = fields or {}
labels = labels or {}
# always filter results to only include Seldon deployments managed
# by ZenML
labels["app"] = "zenml"
if name:
fields = {"metadata.name": name}
field_selector = (
",".join(f"{k}={v}" for k, v in fields.items()) if fields else None
)
label_selector = (
",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
)
try:
logger.debug(
f"Searching SeldonDeployment resources with label selector "
f"'{labels or ''}' and field selector '{fields or ''}'"
)
response = self._custom_objects_api.list_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
field_selector=field_selector,
label_selector=label_selector,
)
logger.debug(
"Seldon Core API returned %s items", len(response["items"])
)
deployments = []
for item in response.get("items") or []:
try:
deployments.append(SeldonDeployment(**item))
except ValidationError as e:
logger.error(
"Invalid Seldon Core deployment resource: %s\n%s",
str(e),
str(item),
)
return deployments
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when searching SeldonDeployment resources with "
"label selector '%s' and field selector '%s': %s",
label_selector or "",
field_selector or "",
)
raise SeldonClientError(
f"Unexpected exception when searching SeldonDeployment "
f"with labels '{labels or ''}' and field '{fields or ''}'"
) from e
get_deployment(self, name)
Get a ZenML managed Seldon Core deployment resource by name.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Seldon Core deployment resource to fetch. |
required |
Returns:
Type | Description |
---|---|
SeldonDeployment |
The Seldon Core deployment resource. |
Exceptions:
Type | Description |
---|---|
SeldonDeploymentNotFoundError |
if the deployment resource cannot be found or is not managed by ZenML. |
SeldonClientError |
if an unknown error occurs while fetching the deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
def get_deployment(self, name: str) -> SeldonDeployment:
"""Get a ZenML managed Seldon Core deployment resource by name.
Args:
name: the name of the Seldon Core deployment resource to fetch.
Returns:
The Seldon Core deployment resource.
Raises:
SeldonDeploymentNotFoundError: if the deployment resource cannot
be found or is not managed by ZenML.
SeldonClientError: if an unknown error occurs while fetching
the deployment.
"""
try:
logger.debug(f"Retrieving SeldonDeployment resource: {name}")
response = self._custom_objects_api.get_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=name,
)
logger.debug("Seldon Core API response: %s", response)
try:
deployment = SeldonDeployment(**response)
except ValidationError as e:
logger.error(
"Invalid Seldon Core deployment resource: %s\n%s",
str(e),
str(response),
)
raise SeldonDeploymentNotFoundError(
f"SeldonDeployment resource {name} could not be parsed"
)
# Only Seldon deployments managed by ZenML are returned
if not deployment.is_managed_by_zenml():
raise SeldonDeploymentNotFoundError(
f"Seldon Deployment {name} is not managed by ZenML"
)
return deployment
except k8s_client.rest.ApiException as e:
if e.status == 404:
raise SeldonDeploymentNotFoundError(
f"SeldonDeployment resource not found: {name}"
) from e
logger.error(
"Exception when fetching SeldonDeployment resource %s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Unexpected exception when fetching SeldonDeployment "
f"resource: {name}"
) from e
get_deployment_logs(self, name, follow=False, tail=None)
Get the logs of a Seldon Core deployment resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
the name of the Seldon Core deployment to get logs for. |
required |
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Yields:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
The next log line. |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs while fetching the logs. |
Source code in zenml/integrations/seldon/seldon_client.py
def get_deployment_logs(
self,
name: str,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core deployment resource.
Args:
name: the name of the Seldon Core deployment to get logs for.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
Yields:
The next log line.
Raises:
SeldonClientError: if an unknown error occurs while fetching
the logs.
"""
logger.debug(f"Retrieving logs for SeldonDeployment resource: {name}")
try:
response = self._core_api.list_namespaced_pod(
namespace=self._namespace,
label_selector=f"seldon-deployment-id={name}",
)
logger.debug("Kubernetes API response: %s", response)
pods = response.items
if not pods:
raise SeldonClientError(
f"The Seldon Core deployment {name} is not currently "
f"running: no Kubernetes pods associated with it were found"
)
pod = pods[0]
pod_name = pod.metadata.name
containers = [c.name for c in pod.spec.containers]
init_containers = [c.name for c in pod.spec.init_containers]
container_statuses = {
c.name: c.started or c.restart_count
for c in pod.status.container_statuses
}
container = "default"
if container not in containers:
container = containers[0]
# some containers might not be running yet and have no logs to show,
# so we need to filter them out
if not container_statuses[container]:
container = init_containers[0]
logger.info(
f"Retrieving logs for pod: `{pod_name}` and container "
f"`{container}` in namespace `{self._namespace}`"
)
response = self._core_api.read_namespaced_pod_log(
name=pod_name,
namespace=self._namespace,
container=container,
follow=follow,
tail_lines=tail,
_preload_content=False,
)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when fetching logs for SeldonDeployment resource "
"%s: %s",
name,
str(e),
)
raise SeldonClientError(
f"Unexpected exception when fetching logs for SeldonDeployment "
f"resource: {name}"
) from e
try:
while True:
line = response.readline().decode("utf-8").rstrip("\n")
if not line:
return
stop = yield line
if stop:
return
finally:
response.release_conn()
sanitize_labels(labels)
staticmethod
Update the label values to be valid Kubernetes labels.
See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Parameters:
Name | Type | Description | Default |
---|---|---|---|
labels |
Dict[str, str] |
the labels to sanitize. |
required |
Source code in zenml/integrations/seldon/seldon_client.py
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
"""Update the label values to be valid Kubernetes labels.
See:
https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
Args:
labels: the labels to sanitize.
"""
for key, value in labels.items():
# Kubernetes labels must be alphanumeric, no longer than
# 63 characters, and must begin and end with an alphanumeric
# character ([a-z0-9A-Z])
labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
"-_."
)
update_deployment(self, deployment, poll_timeout=0)
Update a Seldon Core deployment resource.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
SeldonDeployment |
the Seldon Core deployment resource to update |
required |
poll_timeout |
int |
the maximum time to wait for the deployment to become available or to fail. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment is still pending creation, it will be returned anyway and no exception will be raised. |
0 |
Returns:
Type | Description |
---|---|
SeldonDeployment |
the updated Seldon Core deployment resource with updated status. |
Exceptions:
Type | Description |
---|---|
SeldonClientError |
if an unknown error occurs while updating the deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
def update_deployment(
self,
deployment: SeldonDeployment,
poll_timeout: int = 0,
) -> SeldonDeployment:
"""Update a Seldon Core deployment resource.
Args:
deployment: the Seldon Core deployment resource to update
poll_timeout: the maximum time to wait for the deployment to become
available or to fail. If set to 0, the function will return
immediately without checking the deployment status. If a timeout
occurs and the deployment is still pending creation, it will
be returned anyway and no exception will be raised.
Returns:
the updated Seldon Core deployment resource with updated status.
Raises:
SeldonClientError: if an unknown error occurs while updating the
deployment.
"""
try:
logger.debug(
f"Updating SeldonDeployment resource: {deployment.name}"
)
# mark the deployment as managed by ZenML, to differentiate
# between deployments that are created by ZenML and those that
# are not
deployment.mark_as_managed_by_zenml()
# call `get_deployment` to check that the deployment exists
# and is managed by ZenML. It will raise
# a SeldonDeploymentNotFoundError otherwise
self.get_deployment(name=deployment.name)
response = self._custom_objects_api.patch_namespaced_custom_object(
group="machinelearning.seldon.io",
version="v1",
namespace=self._namespace,
plural="seldondeployments",
name=deployment.name,
body=deployment.dict(exclude_none=True),
_request_timeout=poll_timeout or None,
)
logger.debug("Seldon Core API response: %s", response)
except k8s_client.rest.ApiException as e:
logger.error(
"Exception when updating SeldonDeployment resource: %s", str(e)
)
raise SeldonClientError(
"Exception when creating SeldonDeployment resource"
) from e
updated_deployment = self.get_deployment(name=deployment.name)
while poll_timeout > 0 and updated_deployment.is_pending():
time.sleep(5)
poll_timeout -= 5
updated_deployment = self.get_deployment(name=deployment.name)
return updated_deployment
SeldonClientError (Exception)
Base exception class for all exceptions raised by the SeldonClient.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonClientError(Exception):
"""Base exception class for all exceptions raised by the SeldonClient."""
SeldonClientTimeout (SeldonClientError)
Raised when the Seldon client timed out while waiting for a resource to reach the expected status.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonClientTimeout(SeldonClientError):
"""Raised when the Seldon client timed out while waiting for a resource to reach the expected status."""
SeldonDeployment (BaseModel)
pydantic-model
A Seldon Core deployment CRD.
This is a Pydantic representation of some of the fields in the Seldon Core CRD (documented here: https://docs.seldon.io/projects/seldon-core/en/latest/reference/seldon-deployment.html).
Note that not all fields are represented, only those that are relevant to the ZenML integration. The fields that are not represented are silently ignored when the Seldon Deployment is created or updated from an external SeldonDeployment CRD representation.
Attributes:
Name | Type | Description |
---|---|---|
kind |
str |
Kubernetes kind field. |
apiVersion |
str |
Kubernetes apiVersion field. |
metadata |
SeldonDeploymentMetadata |
Kubernetes metadata field. |
spec |
SeldonDeploymentSpec |
Seldon Deployment spec entry. |
status |
Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentStatus] |
Seldon Deployment status. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeployment(BaseModel):
"""A Seldon Core deployment CRD.
This is a Pydantic representation of some of the fields in the Seldon Core
CRD (documented here:
https://docs.seldon.io/projects/seldon-core/en/latest/reference/seldon-deployment.html).
Note that not all fields are represented, only those that are relevant to
the ZenML integration. The fields that are not represented are silently
ignored when the Seldon Deployment is created or updated from an external
SeldonDeployment CRD representation.
Attributes:
kind: Kubernetes kind field.
apiVersion: Kubernetes apiVersion field.
metadata: Kubernetes metadata field.
spec: Seldon Deployment spec entry.
status: Seldon Deployment status.
"""
kind: str = Field(SELDON_DEPLOYMENT_KIND, const=True)
apiVersion: str = Field(SELDON_DEPLOYMENT_API_VERSION, const=True)
metadata: SeldonDeploymentMetadata = Field(
default_factory=SeldonDeploymentMetadata
)
spec: SeldonDeploymentSpec = Field(default_factory=SeldonDeploymentSpec)
status: Optional[SeldonDeploymentStatus]
def __str__(self) -> str:
"""Returns a string representation of the Seldon Deployment.
Returns:
A string representation of the Seldon Deployment.
"""
return json.dumps(self.dict(exclude_none=True), indent=4)
@classmethod
def build(
cls,
name: Optional[str] = None,
model_uri: Optional[str] = None,
model_name: Optional[str] = None,
implementation: Optional[str] = None,
secret_name: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
annotations: Optional[Dict[str, str]] = None,
is_custom_deployment: Optional[bool] = False,
spec: Optional[Dict[Any, Any]] = None,
) -> "SeldonDeployment":
"""Build a basic Seldon Deployment object.
Args:
name: The name of the Seldon Deployment. If not explicitly passed,
a unique name is autogenerated.
model_uri: The URI of the model.
model_name: The name of the model.
implementation: The implementation of the model.
secret_name: The name of the Kubernetes secret containing
environment variable values (e.g. with credentials for the
artifact store) to use with the deployment service.
labels: A dictionary of labels to apply to the Seldon Deployment.
annotations: A dictionary of annotations to apply to the Seldon
Deployment.
spec: A Kubernetes pod spec to use for the Seldon Deployment.
is_custom_deployment: Whether the Seldon Deployment is a custom or a built-in one.
Returns:
A minimal SeldonDeployment object built from the provided
parameters.
"""
if not name:
name = f"zenml-{time.time()}"
if labels is None:
labels = {}
if annotations is None:
annotations = {}
if is_custom_deployment:
predictors = [
SeldonDeploymentPredictor(
name=model_name or "",
graph=SeldonDeploymentPredictiveUnit(
name="classifier",
type=SeldonDeploymentPredictiveUnitType.MODEL,
),
componentSpecs=[
SeldonDeploymentComponentSpecs(
spec=spec
# TODO [HIGH]: Add support for other component types (e.g. graph)
)
],
)
]
else:
predictors = [
SeldonDeploymentPredictor(
name=model_name or "",
graph=SeldonDeploymentPredictiveUnit(
name="classifier",
type=SeldonDeploymentPredictiveUnitType.MODEL,
modelUri=model_uri or "",
implementation=implementation or "",
envSecretRefName=secret_name,
),
)
]
return SeldonDeployment(
metadata=SeldonDeploymentMetadata(
name=name, labels=labels, annotations=annotations
),
spec=SeldonDeploymentSpec(name=name, predictors=predictors),
)
def is_managed_by_zenml(self) -> bool:
"""Checks if this Seldon Deployment is managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the `app`
label value to `zenml`.
Returns:
True if the Seldon Deployment is managed by ZenML, False
otherwise.
"""
return self.metadata.labels.get("app") == "zenml"
def mark_as_managed_by_zenml(self) -> None:
"""Marks this Seldon Deployment as managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the `app`
label value to `zenml`.
"""
self.metadata.labels["app"] = "zenml"
@property
def name(self) -> str:
"""Returns the name of this Seldon Deployment.
This is just a shortcut for `self.metadata.name`.
Returns:
The name of this Seldon Deployment.
"""
return self.metadata.name
@property
def state(self) -> SeldonDeploymentStatusState:
"""The state of the Seldon Deployment.
Returns:
The state of the Seldon Deployment.
"""
if not self.status:
return SeldonDeploymentStatusState.UNKNOWN
return self.status.state
def is_pending(self) -> bool:
"""Checks if the Seldon Deployment is in a pending state.
Returns:
True if the Seldon Deployment is pending, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.CREATING
def is_available(self) -> bool:
"""Checks if the Seldon Deployment is in an available state.
Returns:
True if the Seldon Deployment is available, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.AVAILABLE
def is_failed(self) -> bool:
"""Checks if the Seldon Deployment is in a failed state.
Returns:
True if the Seldon Deployment is failed, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.FAILED
def get_error(self) -> Optional[str]:
"""Get a message describing the error, if in an error state.
Returns:
A message describing the error, if in an error state, otherwise
None.
"""
if self.status and self.is_failed():
return self.status.description
return None
def get_pending_message(self) -> Optional[str]:
"""Get a message describing the pending conditions of the Seldon Deployment.
Returns:
A message describing the pending condition of the Seldon
Deployment, or None, if no conditions are pending.
"""
if not self.status or not self.status.conditions:
return None
ready_condition_message = [
c.message
for c in self.status.conditions
if c.type == "Ready" and not c.status
]
if not ready_condition_message:
return None
return ready_condition_message[0]
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
name: str
property
readonly
Returns the name of this Seldon Deployment.
This is just a shortcut for self.metadata.name
.
Returns:
Type | Description |
---|---|
str |
The name of this Seldon Deployment. |
state: SeldonDeploymentStatusState
property
readonly
The state of the Seldon Deployment.
Returns:
Type | Description |
---|---|
SeldonDeploymentStatusState |
The state of the Seldon Deployment. |
Config
Pydantic configuration class.
Source code in zenml/integrations/seldon/seldon_client.py
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
__str__(self)
special
Returns a string representation of the Seldon Deployment.
Returns:
Type | Description |
---|---|
str |
A string representation of the Seldon Deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
def __str__(self) -> str:
"""Returns a string representation of the Seldon Deployment.
Returns:
A string representation of the Seldon Deployment.
"""
return json.dumps(self.dict(exclude_none=True), indent=4)
build(name=None, model_uri=None, model_name=None, implementation=None, secret_name=None, labels=None, annotations=None, is_custom_deployment=False, spec=None)
classmethod
Build a basic Seldon Deployment object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
Optional[str] |
The name of the Seldon Deployment. If not explicitly passed, a unique name is autogenerated. |
None |
model_uri |
Optional[str] |
The URI of the model. |
None |
model_name |
Optional[str] |
The name of the model. |
None |
implementation |
Optional[str] |
The implementation of the model. |
None |
secret_name |
Optional[str] |
The name of the Kubernetes secret containing environment variable values (e.g. with credentials for the artifact store) to use with the deployment service. |
None |
labels |
Optional[Dict[str, str]] |
A dictionary of labels to apply to the Seldon Deployment. |
None |
annotations |
Optional[Dict[str, str]] |
A dictionary of annotations to apply to the Seldon Deployment. |
None |
spec |
Optional[Dict[Any, Any]] |
A Kubernetes pod spec to use for the Seldon Deployment. |
None |
is_custom_deployment |
Optional[bool] |
Whether the Seldon Deployment is a custom or a built-in one. |
False |
Returns:
Type | Description |
---|---|
SeldonDeployment |
A minimal SeldonDeployment object built from the provided parameters. |
Source code in zenml/integrations/seldon/seldon_client.py
@classmethod
def build(
cls,
name: Optional[str] = None,
model_uri: Optional[str] = None,
model_name: Optional[str] = None,
implementation: Optional[str] = None,
secret_name: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
annotations: Optional[Dict[str, str]] = None,
is_custom_deployment: Optional[bool] = False,
spec: Optional[Dict[Any, Any]] = None,
) -> "SeldonDeployment":
"""Build a basic Seldon Deployment object.
Args:
name: The name of the Seldon Deployment. If not explicitly passed,
a unique name is autogenerated.
model_uri: The URI of the model.
model_name: The name of the model.
implementation: The implementation of the model.
secret_name: The name of the Kubernetes secret containing
environment variable values (e.g. with credentials for the
artifact store) to use with the deployment service.
labels: A dictionary of labels to apply to the Seldon Deployment.
annotations: A dictionary of annotations to apply to the Seldon
Deployment.
spec: A Kubernetes pod spec to use for the Seldon Deployment.
is_custom_deployment: Whether the Seldon Deployment is a custom or a built-in one.
Returns:
A minimal SeldonDeployment object built from the provided
parameters.
"""
if not name:
name = f"zenml-{time.time()}"
if labels is None:
labels = {}
if annotations is None:
annotations = {}
if is_custom_deployment:
predictors = [
SeldonDeploymentPredictor(
name=model_name or "",
graph=SeldonDeploymentPredictiveUnit(
name="classifier",
type=SeldonDeploymentPredictiveUnitType.MODEL,
),
componentSpecs=[
SeldonDeploymentComponentSpecs(
spec=spec
# TODO [HIGH]: Add support for other component types (e.g. graph)
)
],
)
]
else:
predictors = [
SeldonDeploymentPredictor(
name=model_name or "",
graph=SeldonDeploymentPredictiveUnit(
name="classifier",
type=SeldonDeploymentPredictiveUnitType.MODEL,
modelUri=model_uri or "",
implementation=implementation or "",
envSecretRefName=secret_name,
),
)
]
return SeldonDeployment(
metadata=SeldonDeploymentMetadata(
name=name, labels=labels, annotations=annotations
),
spec=SeldonDeploymentSpec(name=name, predictors=predictors),
)
get_error(self)
Get a message describing the error, if in an error state.
Returns:
Type | Description |
---|---|
Optional[str] |
A message describing the error, if in an error state, otherwise None. |
Source code in zenml/integrations/seldon/seldon_client.py
def get_error(self) -> Optional[str]:
"""Get a message describing the error, if in an error state.
Returns:
A message describing the error, if in an error state, otherwise
None.
"""
if self.status and self.is_failed():
return self.status.description
return None
get_pending_message(self)
Get a message describing the pending conditions of the Seldon Deployment.
Returns:
Type | Description |
---|---|
Optional[str] |
A message describing the pending condition of the Seldon Deployment, or None, if no conditions are pending. |
Source code in zenml/integrations/seldon/seldon_client.py
def get_pending_message(self) -> Optional[str]:
"""Get a message describing the pending conditions of the Seldon Deployment.
Returns:
A message describing the pending condition of the Seldon
Deployment, or None, if no conditions are pending.
"""
if not self.status or not self.status.conditions:
return None
ready_condition_message = [
c.message
for c in self.status.conditions
if c.type == "Ready" and not c.status
]
if not ready_condition_message:
return None
return ready_condition_message[0]
is_available(self)
Checks if the Seldon Deployment is in an available state.
Returns:
Type | Description |
---|---|
bool |
True if the Seldon Deployment is available, False otherwise. |
Source code in zenml/integrations/seldon/seldon_client.py
def is_available(self) -> bool:
"""Checks if the Seldon Deployment is in an available state.
Returns:
True if the Seldon Deployment is available, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.AVAILABLE
is_failed(self)
Checks if the Seldon Deployment is in a failed state.
Returns:
Type | Description |
---|---|
bool |
True if the Seldon Deployment is failed, False otherwise. |
Source code in zenml/integrations/seldon/seldon_client.py
def is_failed(self) -> bool:
"""Checks if the Seldon Deployment is in a failed state.
Returns:
True if the Seldon Deployment is failed, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.FAILED
is_managed_by_zenml(self)
Checks if this Seldon Deployment is managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the app
label value to zenml
.
Returns:
Type | Description |
---|---|
bool |
True if the Seldon Deployment is managed by ZenML, False otherwise. |
Source code in zenml/integrations/seldon/seldon_client.py
def is_managed_by_zenml(self) -> bool:
"""Checks if this Seldon Deployment is managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the `app`
label value to `zenml`.
Returns:
True if the Seldon Deployment is managed by ZenML, False
otherwise.
"""
return self.metadata.labels.get("app") == "zenml"
is_pending(self)
Checks if the Seldon Deployment is in a pending state.
Returns:
Type | Description |
---|---|
bool |
True if the Seldon Deployment is pending, False otherwise. |
Source code in zenml/integrations/seldon/seldon_client.py
def is_pending(self) -> bool:
"""Checks if the Seldon Deployment is in a pending state.
Returns:
True if the Seldon Deployment is pending, False otherwise.
"""
return self.state == SeldonDeploymentStatusState.CREATING
mark_as_managed_by_zenml(self)
Marks this Seldon Deployment as managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the app
label value to zenml
.
Source code in zenml/integrations/seldon/seldon_client.py
def mark_as_managed_by_zenml(self) -> None:
"""Marks this Seldon Deployment as managed by ZenML.
The convention used to differentiate between SeldonDeployment instances
that are managed by ZenML and those that are not is to set the `app`
label value to `zenml`.
"""
self.metadata.labels["app"] = "zenml"
SeldonDeploymentComponentSpecs (BaseModel)
pydantic-model
Component specs for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
spec |
Optional[Dict[str, Any]] |
the component spec. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentComponentSpecs(BaseModel):
"""Component specs for a Seldon Deployment.
Attributes:
spec: the component spec.
"""
spec: Optional[Dict[str, Any]]
# TODO [HIGH]: Add graph field to ComponentSpecs. graph: Optional[SeldonDeploymentPredictiveUnit]
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
Config
Pydantic configuration class.
Source code in zenml/integrations/seldon/seldon_client.py
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
SeldonDeploymentExistsError (SeldonClientError)
Raised when a SeldonDeployment resource cannot be created because a resource with the same name already exists.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentExistsError(SeldonClientError):
"""Raised when a SeldonDeployment resource cannot be created because a resource with the same name already exists."""
SeldonDeploymentMetadata (BaseModel)
pydantic-model
Metadata for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
the name of the Seldon Deployment. |
labels |
Dict[str, str] |
Kubernetes labels for the Seldon Deployment. |
annotations |
Dict[str, str] |
Kubernetes annotations for the Seldon Deployment. |
creationTimestamp |
Optional[str] |
the creation timestamp of the Seldon Deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentMetadata(BaseModel):
"""Metadata for a Seldon Deployment.
Attributes:
name: the name of the Seldon Deployment.
labels: Kubernetes labels for the Seldon Deployment.
annotations: Kubernetes annotations for the Seldon Deployment.
creationTimestamp: the creation timestamp of the Seldon Deployment.
"""
name: str
labels: Dict[str, str] = Field(default_factory=dict)
annotations: Dict[str, str] = Field(default_factory=dict)
creationTimestamp: Optional[str]
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
Config
Pydantic configuration class.
Source code in zenml/integrations/seldon/seldon_client.py
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
SeldonDeploymentNotFoundError (SeldonClientError)
Raised when a particular SeldonDeployment resource is not found or is not managed by ZenML.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentNotFoundError(SeldonClientError):
"""Raised when a particular SeldonDeployment resource is not found or is not managed by ZenML."""
SeldonDeploymentPredictiveUnit (BaseModel)
pydantic-model
Seldon Deployment predictive unit.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
the name of the predictive unit. |
type |
Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictiveUnitType] |
predictive unit type. |
implementation |
Optional[str] |
the Seldon Core implementation used to serve the model. |
modelUri |
Optional[str] |
URI of the model (or models) to serve. |
serviceAccountName |
Optional[str] |
the name of the service account to associate with the predictive unit container. |
envSecretRefName |
Optional[str] |
the name of a Kubernetes secret that contains environment variables (e.g. credentials) to be configured for the predictive unit container. |
children |
List[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictiveUnit] |
a list of child predictive units that together make up the model serving graph. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictiveUnit(BaseModel):
"""Seldon Deployment predictive unit.
Attributes:
name: the name of the predictive unit.
type: predictive unit type.
implementation: the Seldon Core implementation used to serve the model.
modelUri: URI of the model (or models) to serve.
serviceAccountName: the name of the service account to associate with
the predictive unit container.
envSecretRefName: the name of a Kubernetes secret that contains
environment variables (e.g. credentials) to be configured for the
predictive unit container.
children: a list of child predictive units that together make up the
model serving graph.
"""
name: str
type: Optional[
SeldonDeploymentPredictiveUnitType
] = SeldonDeploymentPredictiveUnitType.MODEL
implementation: Optional[str]
modelUri: Optional[str]
serviceAccountName: Optional[str]
envSecretRefName: Optional[str]
children: List["SeldonDeploymentPredictiveUnit"] = Field(
default_factory=list
)
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
Config
Pydantic configuration class.
Source code in zenml/integrations/seldon/seldon_client.py
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
SeldonDeploymentPredictiveUnitType (StrEnum)
Predictive unit types for a Seldon Deployment.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictiveUnitType(StrEnum):
"""Predictive unit types for a Seldon Deployment."""
UNKNOWN_TYPE = "UNKNOWN_TYPE"
ROUTER = "ROUTER"
COMBINER = "COMBINER"
MODEL = "MODEL"
TRANSFORMER = "TRANSFORMER"
OUTPUT_TRANSFORMER = "OUTPUT_TRANSFORMER"
SeldonDeploymentPredictor (BaseModel)
pydantic-model
Seldon Deployment predictor.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
the name of the predictor. |
replicas |
int |
the number of pod replicas for the predictor. |
graph |
Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictiveUnit] |
the serving graph composed of one or more predictive units. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictor(BaseModel):
"""Seldon Deployment predictor.
Attributes:
name: the name of the predictor.
replicas: the number of pod replicas for the predictor.
graph: the serving graph composed of one or more predictive units.
"""
name: str
replicas: int = 1
graph: Optional[SeldonDeploymentPredictiveUnit] = Field(
default_factory=SeldonDeploymentPredictiveUnit
)
componentSpecs: Optional[List[SeldonDeploymentComponentSpecs]]
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
Config
Pydantic configuration class.
Source code in zenml/integrations/seldon/seldon_client.py
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
SeldonDeploymentSpec (BaseModel)
pydantic-model
Spec for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
the name of the Seldon Deployment. |
protocol |
Optional[str] |
the API protocol used for the Seldon Deployment. |
predictors |
List[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictor] |
a list of predictors that make up the serving graph. |
replicas |
int |
the default number of pod replicas used for the predictors. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentSpec(BaseModel):
"""Spec for a Seldon Deployment.
Attributes:
name: the name of the Seldon Deployment.
protocol: the API protocol used for the Seldon Deployment.
predictors: a list of predictors that make up the serving graph.
replicas: the default number of pod replicas used for the predictors.
"""
name: str
protocol: Optional[str]
predictors: List[SeldonDeploymentPredictor]
replicas: int = 1
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
Config
Pydantic configuration class.
Source code in zenml/integrations/seldon/seldon_client.py
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
SeldonDeploymentStatus (BaseModel)
pydantic-model
The status of a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
state |
SeldonDeploymentStatusState |
the current state of the Seldon Deployment. |
description |
Optional[str] |
a human-readable description of the current state. |
replicas |
Optional[int] |
the current number of running pod replicas |
address |
Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentStatusAddress] |
the address where the Seldon Deployment API can be accessed. |
conditions |
List[zenml.integrations.seldon.seldon_client.SeldonDeploymentStatusCondition] |
the list of Kubernetes conditions for the Seldon Deployment. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatus(BaseModel):
"""The status of a Seldon Deployment.
Attributes:
state: the current state of the Seldon Deployment.
description: a human-readable description of the current state.
replicas: the current number of running pod replicas
address: the address where the Seldon Deployment API can be accessed.
conditions: the list of Kubernetes conditions for the Seldon Deployment.
"""
state: SeldonDeploymentStatusState = SeldonDeploymentStatusState.UNKNOWN
description: Optional[str]
replicas: Optional[int]
address: Optional[SeldonDeploymentStatusAddress]
conditions: List[SeldonDeploymentStatusCondition]
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
Config
Pydantic configuration class.
Source code in zenml/integrations/seldon/seldon_client.py
class Config:
"""Pydantic configuration class."""
# validate attribute assignments
validate_assignment = True
# Ignore extra attributes from the CRD that are not reflected here
extra = "ignore"
SeldonDeploymentStatusAddress (BaseModel)
pydantic-model
The status address for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
url |
str |
the URL where the Seldon Deployment API can be accessed internally. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatusAddress(BaseModel):
"""The status address for a Seldon Deployment.
Attributes:
url: the URL where the Seldon Deployment API can be accessed internally.
"""
url: str
SeldonDeploymentStatusCondition (BaseModel)
pydantic-model
The Kubernetes status condition entry for a Seldon Deployment.
Attributes:
Name | Type | Description |
---|---|---|
type |
str |
Type of runtime condition. |
status |
bool |
Status of the condition. |
reason |
Optional[str] |
Brief CamelCase string containing reason for the condition's last transition. |
message |
Optional[str] |
Human-readable message indicating details about last transition. |
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatusCondition(BaseModel):
"""The Kubernetes status condition entry for a Seldon Deployment.
Attributes:
type: Type of runtime condition.
status: Status of the condition.
reason: Brief CamelCase string containing reason for the condition's
last transition.
message: Human-readable message indicating details about last
transition.
"""
type: str
status: bool
reason: Optional[str]
message: Optional[str]
SeldonDeploymentStatusState (StrEnum)
Possible state values for a Seldon Deployment.
Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatusState(StrEnum):
"""Possible state values for a Seldon Deployment."""
UNKNOWN = "Unknown"
AVAILABLE = "Available"
CREATING = "Creating"
FAILED = "Failed"
create_seldon_core_custom_spec(model_uri, custom_docker_image, secret_name, command, container_registry_secret_name=None)
Create a custom pod spec for the seldon core container.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_uri |
Optional[str] |
The URI of the model to load. |
required |
custom_docker_image |
Optional[str] |
The docker image to use. |
required |
secret_name |
Optional[str] |
The name of the secret to use. |
required |
command |
Optional[List[str]] |
The command to run in the container. |
required |
container_registry_secret_name |
Optional[str] |
The name of the secret to use for docker image pull. |
None |
Returns:
Type | Description |
---|---|
V1PodSpec |
A pod spec for the seldon core container. |
Source code in zenml/integrations/seldon/seldon_client.py
def create_seldon_core_custom_spec(
model_uri: Optional[str],
custom_docker_image: Optional[str],
secret_name: Optional[str],
command: Optional[List[str]],
container_registry_secret_name: Optional[str] = None,
) -> k8s_client.V1PodSpec:
"""Create a custom pod spec for the seldon core container.
Args:
model_uri: The URI of the model to load.
custom_docker_image: The docker image to use.
secret_name: The name of the secret to use.
command: The command to run in the container.
container_registry_secret_name: The name of the secret to use for docker image pull.
Returns:
A pod spec for the seldon core container.
"""
volume = k8s_client.V1Volume(
name="classifier-provision-location",
empty_dir={},
)
init_container = k8s_client.V1Container(
name="classifier-model-initializer",
image="seldonio/rclone-storage-initializer:1.14.0-dev",
image_pull_policy="IfNotPresent",
args=[model_uri, "/mnt/models"],
volume_mounts=[
k8s_client.V1VolumeMount(
name="classifier-provision-location", mount_path="/mnt/models"
)
],
env_from=[
k8s_client.V1EnvFromSource(
secret_ref=k8s_client.V1SecretEnvSource(
name=secret_name, optional=False
)
)
],
)
image_pull_secret = k8s_client.V1LocalObjectReference(
name=container_registry_secret_name
)
container = k8s_client.V1Container(
name="classifier",
image=custom_docker_image,
image_pull_policy="IfNotPresent",
command=command,
volume_mounts=[
k8s_client.V1VolumeMount(
name="classifier-provision-location",
mount_path="/mnt/models",
read_only=True,
)
],
ports=[
k8s_client.V1ContainerPort(container_port=5000),
k8s_client.V1ContainerPort(container_port=9000),
],
)
if image_pull_secret:
spec = k8s_client.V1PodSpec(
volumes=[
volume,
],
init_containers=[
init_container,
],
image_pull_secrets=[image_pull_secret],
containers=[container],
)
else:
spec = k8s_client.V1PodSpec(
volumes=[
volume,
],
init_containers=[
init_container,
],
containers=[container],
)
return api.sanitize_for_serialization(spec)
services
special
Initialization for Seldon services.
seldon_deployment
Implementation for the Seldon Deployment step.
SeldonDeploymentConfig (ServiceConfig)
pydantic-model
Seldon Core deployment service configuration.
Attributes:
Name | Type | Description |
---|---|---|
model_uri |
str |
URI of the model (or models) to serve. |
model_name |
str |
the name of the model. Multiple versions of the same model should use the same model name. |
implementation |
str |
the Seldon Core implementation used to serve the model.
The implementation type can be one of the following: |
replicas |
int |
number of replicas to use for the prediction service. |
secret_name |
Optional[str] |
the name of a Kubernetes secret containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store). |
model_metadata |
Dict[str, Any] |
optional model metadata information (see https://docs.seldon.io/projects/seldon-core/en/latest/reference/apis/metadata.html). |
extra_args |
Dict[str, Any] |
additional arguments to pass to the Seldon Core deployment resource configuration. |
is_custom_deployment |
Optional[bool] |
whether the deployment is a custom deployment |
spec |
Optional[Dict[Any, Any]] |
custom Kubernetes resource specification for the Seldon Core |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
class SeldonDeploymentConfig(ServiceConfig):
"""Seldon Core deployment service configuration.
Attributes:
model_uri: URI of the model (or models) to serve.
model_name: the name of the model. Multiple versions of the same model
should use the same model name.
implementation: the Seldon Core implementation used to serve the model.
The implementation type can be one of the following: `TENSORFLOW_SERVER`,
`SKLEARN_SERVER`, `XGBOOST_SERVER`, `custom`.
replicas: number of replicas to use for the prediction service.
secret_name: the name of a Kubernetes secret containing additional
configuration parameters for the Seldon Core deployment (e.g.
credentials to access the Artifact Store).
model_metadata: optional model metadata information (see
https://docs.seldon.io/projects/seldon-core/en/latest/reference/apis/metadata.html).
extra_args: additional arguments to pass to the Seldon Core deployment
resource configuration.
is_custom_deployment: whether the deployment is a custom deployment
spec: custom Kubernetes resource specification for the Seldon Core
"""
model_uri: str = ""
model_name: str = "default"
# TODO [ENG-775]: have an enum of all supported Seldon Core implementations
implementation: str
replicas: int = 1
secret_name: Optional[str]
model_metadata: Dict[str, Any] = Field(default_factory=dict)
extra_args: Dict[str, Any] = Field(default_factory=dict)
is_custom_deployment: Optional[bool] = False
spec: Optional[Dict[Any, Any]] = Field(default_factory=dict)
def get_seldon_deployment_labels(self) -> Dict[str, str]:
"""Generate labels for the Seldon Core deployment from the service configuration.
These labels are attached to the Seldon Core deployment resource
and may be used as label selectors in lookup operations.
Returns:
The labels for the Seldon Core deployment.
"""
labels = {}
if self.pipeline_name:
labels["zenml.pipeline_name"] = self.pipeline_name
if self.pipeline_run_id:
labels["zenml.pipeline_run_id"] = self.pipeline_run_id
if self.pipeline_step_name:
labels["zenml.pipeline_step_name"] = self.pipeline_step_name
if self.model_name:
labels["zenml.model_name"] = self.model_name
if self.model_uri:
labels["zenml.model_uri"] = self.model_uri
if self.implementation:
labels["zenml.model_type"] = self.implementation
SeldonClient.sanitize_labels(labels)
return labels
def get_seldon_deployment_annotations(self) -> Dict[str, str]:
"""Generate annotations for the Seldon Core deployment from the service configuration.
The annotations are used to store additional information about the
Seldon Core service that is associated with the deployment that is
not available in the labels. One annotation particularly important
is the serialized Service configuration itself, which is used to
recreate the service configuration from a remote Seldon deployment.
Returns:
The annotations for the Seldon Core deployment.
"""
annotations = {
"zenml.service_config": self.json(),
"zenml.version": __version__,
}
return annotations
@classmethod
def create_from_deployment(
cls, deployment: SeldonDeployment
) -> "SeldonDeploymentConfig":
"""Recreate the configuration of a Seldon Core Service from a deployed instance.
Args:
deployment: the Seldon Core deployment resource.
Returns:
The Seldon Core service configuration corresponding to the given
Seldon Core deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible Seldon Core service configuration.
"""
config_data = deployment.metadata.annotations.get(
"zenml.service_config"
)
if not config_data:
raise ValueError(
f"The given deployment resource does not contain a "
f"'zenml.service_config' annotation: {deployment}"
)
try:
service_config = cls.parse_raw(config_data)
except ValidationError as e:
raise ValueError(
f"The loaded Seldon Core deployment resource contains an "
f"invalid or incompatible Seldon Core service configuration: "
f"{config_data}"
) from e
return service_config
create_from_deployment(deployment)
classmethod
Recreate the configuration of a Seldon Core Service from a deployed instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
SeldonDeployment |
the Seldon Core deployment resource. |
required |
Returns:
Type | Description |
---|---|
SeldonDeploymentConfig |
The Seldon Core service configuration corresponding to the given Seldon Core deployment resource. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible Seldon Core service configuration. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
@classmethod
def create_from_deployment(
cls, deployment: SeldonDeployment
) -> "SeldonDeploymentConfig":
"""Recreate the configuration of a Seldon Core Service from a deployed instance.
Args:
deployment: the Seldon Core deployment resource.
Returns:
The Seldon Core service configuration corresponding to the given
Seldon Core deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected annotations or it contains an invalid or
incompatible Seldon Core service configuration.
"""
config_data = deployment.metadata.annotations.get(
"zenml.service_config"
)
if not config_data:
raise ValueError(
f"The given deployment resource does not contain a "
f"'zenml.service_config' annotation: {deployment}"
)
try:
service_config = cls.parse_raw(config_data)
except ValidationError as e:
raise ValueError(
f"The loaded Seldon Core deployment resource contains an "
f"invalid or incompatible Seldon Core service configuration: "
f"{config_data}"
) from e
return service_config
get_seldon_deployment_annotations(self)
Generate annotations for the Seldon Core deployment from the service configuration.
The annotations are used to store additional information about the Seldon Core service that is associated with the deployment that is not available in the labels. One annotation particularly important is the serialized Service configuration itself, which is used to recreate the service configuration from a remote Seldon deployment.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The annotations for the Seldon Core deployment. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def get_seldon_deployment_annotations(self) -> Dict[str, str]:
"""Generate annotations for the Seldon Core deployment from the service configuration.
The annotations are used to store additional information about the
Seldon Core service that is associated with the deployment that is
not available in the labels. One annotation particularly important
is the serialized Service configuration itself, which is used to
recreate the service configuration from a remote Seldon deployment.
Returns:
The annotations for the Seldon Core deployment.
"""
annotations = {
"zenml.service_config": self.json(),
"zenml.version": __version__,
}
return annotations
get_seldon_deployment_labels(self)
Generate labels for the Seldon Core deployment from the service configuration.
These labels are attached to the Seldon Core deployment resource and may be used as label selectors in lookup operations.
Returns:
Type | Description |
---|---|
Dict[str, str] |
The labels for the Seldon Core deployment. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def get_seldon_deployment_labels(self) -> Dict[str, str]:
"""Generate labels for the Seldon Core deployment from the service configuration.
These labels are attached to the Seldon Core deployment resource
and may be used as label selectors in lookup operations.
Returns:
The labels for the Seldon Core deployment.
"""
labels = {}
if self.pipeline_name:
labels["zenml.pipeline_name"] = self.pipeline_name
if self.pipeline_run_id:
labels["zenml.pipeline_run_id"] = self.pipeline_run_id
if self.pipeline_step_name:
labels["zenml.pipeline_step_name"] = self.pipeline_step_name
if self.model_name:
labels["zenml.model_name"] = self.model_name
if self.model_uri:
labels["zenml.model_uri"] = self.model_uri
if self.implementation:
labels["zenml.model_type"] = self.implementation
SeldonClient.sanitize_labels(labels)
return labels
SeldonDeploymentService (BaseService)
pydantic-model
A service that represents a Seldon Core deployment server.
Attributes:
Name | Type | Description |
---|---|---|
config |
SeldonDeploymentConfig |
service configuration. |
status |
SeldonDeploymentServiceStatus |
service status. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
class SeldonDeploymentService(BaseService):
"""A service that represents a Seldon Core deployment server.
Attributes:
config: service configuration.
status: service status.
"""
SERVICE_TYPE = ServiceType(
name="seldon-deployment",
type="model-serving",
flavor="seldon",
description="Seldon Core prediction service",
)
config: SeldonDeploymentConfig = Field(
default_factory=SeldonDeploymentConfig
)
status: SeldonDeploymentServiceStatus = Field(
default_factory=SeldonDeploymentServiceStatus
)
def _get_client(self) -> SeldonClient:
"""Get the Seldon Core client from the active Seldon Core model deployer.
Returns:
The Seldon Core client.
"""
from zenml.integrations.seldon.model_deployers.seldon_model_deployer import (
SeldonModelDeployer,
)
model_deployer = cast(
SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
)
return model_deployer.seldon_client
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the Seldon Core deployment.
Returns:
The operational state of the Seldon Core deployment and a message
providing additional information about that state (e.g. a
description of the error, if one is encountered).
"""
client = self._get_client()
name = self.seldon_deployment_name
try:
deployment = client.get_deployment(name=name)
except SeldonDeploymentNotFoundError:
return (ServiceState.INACTIVE, "")
if deployment.is_available():
return (
ServiceState.ACTIVE,
f"Seldon Core deployment '{name}' is available",
)
if deployment.is_failed():
return (
ServiceState.ERROR,
f"Seldon Core deployment '{name}' failed: "
f"{deployment.get_error()}",
)
pending_message = deployment.get_pending_message() or ""
return (
ServiceState.PENDING_STARTUP,
"Seldon Core deployment is being created: " + pending_message,
)
@property
def seldon_deployment_name(self) -> str:
"""Get the name of the Seldon Core deployment.
It should return the one that uniquely corresponds to this service instance.
Returns:
The name of the Seldon Core deployment.
"""
return f"zenml-{str(self.uuid)}"
def _get_seldon_deployment_labels(self) -> Dict[str, str]:
"""Generate the labels for the Seldon Core deployment from the service configuration.
Returns:
The labels for the Seldon Core deployment.
"""
labels = self.config.get_seldon_deployment_labels()
labels["zenml.service_uuid"] = str(self.uuid)
SeldonClient.sanitize_labels(labels)
return labels
@classmethod
def create_from_deployment(
cls, deployment: SeldonDeployment
) -> "SeldonDeploymentService":
"""Recreate a Seldon Core service from a Seldon Core deployment resource.
It should then update their operational status.
Args:
deployment: the Seldon Core deployment resource.
Returns:
The Seldon Core service corresponding to the given
Seldon Core deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected service_uuid label.
"""
config = SeldonDeploymentConfig.create_from_deployment(deployment)
uuid = deployment.metadata.labels.get("zenml.service_uuid")
if not uuid:
raise ValueError(
f"The given deployment resource does not contain a valid "
f"'zenml.service_uuid' label: {deployment}"
)
service = cls(uuid=UUID(uuid), config=config)
service.update_status()
return service
def provision(self) -> None:
"""Provision or update remote Seldon Core deployment instance.
This should then match the current configuration.
"""
client = self._get_client()
name = self.seldon_deployment_name
deployment = SeldonDeployment.build(
name=name,
model_uri=self.config.model_uri,
model_name=self.config.model_name,
implementation=self.config.implementation,
secret_name=self.config.secret_name,
labels=self._get_seldon_deployment_labels(),
annotations=self.config.get_seldon_deployment_annotations(),
is_custom_deployment=self.config.is_custom_deployment,
spec=self.config.spec,
)
deployment.spec.replicas = self.config.replicas
deployment.spec.predictors[0].replicas = self.config.replicas
# check if the Seldon deployment already exists
try:
client.get_deployment(name=name)
# update the existing deployment
client.update_deployment(deployment)
except SeldonDeploymentNotFoundError:
# create the deployment
client.create_deployment(deployment=deployment)
def deprovision(self, force: bool = False) -> None:
"""Deprovision the remote Seldon Core deployment instance.
Args:
force: if True, the remote deployment instance will be
forcefully deprovisioned.
"""
client = self._get_client()
name = self.seldon_deployment_name
try:
client.delete_deployment(name=name, force=force)
except SeldonDeploymentNotFoundError:
pass
def get_logs(
self,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core model deployment.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
return self._get_client().get_deployment_logs(
self.seldon_deployment_name,
follow=follow,
tail=tail,
)
@property
def prediction_url(self) -> Optional[str]:
"""The prediction URI exposed by the prediction service.
Returns:
The prediction URI exposed by the prediction service, or None if
the service is not yet ready.
"""
from zenml.integrations.seldon.model_deployers.seldon_model_deployer import (
SeldonModelDeployer,
)
if not self.is_running:
return None
namespace = self._get_client().namespace
model_deployer = cast(
SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
)
return os.path.join(
model_deployer.config.base_url,
"seldon",
namespace,
self.seldon_deployment_name,
"api/v0.1/predictions",
)
def predict(self, request: str) -> Any:
"""Make a prediction using the service.
Args:
request: a numpy array representing the request
Returns:
A numpy array representing the prediction returned by the service.
Raises:
Exception: if the service is not yet ready.
ValueError: if the prediction_url is not set.
"""
if not self.is_running:
raise Exception(
"Seldon prediction service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is None:
raise ValueError("`self.prediction_url` is not set, cannot post.")
if isinstance(request, str):
request = json.loads(request)
else:
raise ValueError("Request must be a json string.")
response = requests.post(
self.prediction_url,
json={"data": {"ndarray": request}},
)
response.raise_for_status()
return response.json()
prediction_url: Optional[str]
property
readonly
The prediction URI exposed by the prediction service.
Returns:
Type | Description |
---|---|
Optional[str] |
The prediction URI exposed by the prediction service, or None if the service is not yet ready. |
seldon_deployment_name: str
property
readonly
Get the name of the Seldon Core deployment.
It should return the one that uniquely corresponds to this service instance.
Returns:
Type | Description |
---|---|
str |
The name of the Seldon Core deployment. |
check_status(self)
Check the the current operational state of the Seldon Core deployment.
Returns:
Type | Description |
---|---|
Tuple[zenml.services.service_status.ServiceState, str] |
The operational state of the Seldon Core deployment and a message providing additional information about that state (e.g. a description of the error, if one is encountered). |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def check_status(self) -> Tuple[ServiceState, str]:
"""Check the the current operational state of the Seldon Core deployment.
Returns:
The operational state of the Seldon Core deployment and a message
providing additional information about that state (e.g. a
description of the error, if one is encountered).
"""
client = self._get_client()
name = self.seldon_deployment_name
try:
deployment = client.get_deployment(name=name)
except SeldonDeploymentNotFoundError:
return (ServiceState.INACTIVE, "")
if deployment.is_available():
return (
ServiceState.ACTIVE,
f"Seldon Core deployment '{name}' is available",
)
if deployment.is_failed():
return (
ServiceState.ERROR,
f"Seldon Core deployment '{name}' failed: "
f"{deployment.get_error()}",
)
pending_message = deployment.get_pending_message() or ""
return (
ServiceState.PENDING_STARTUP,
"Seldon Core deployment is being created: " + pending_message,
)
create_from_deployment(deployment)
classmethod
Recreate a Seldon Core service from a Seldon Core deployment resource.
It should then update their operational status.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deployment |
SeldonDeployment |
the Seldon Core deployment resource. |
required |
Returns:
Type | Description |
---|---|
SeldonDeploymentService |
The Seldon Core service corresponding to the given Seldon Core deployment resource. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the given deployment resource does not contain the expected service_uuid label. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
@classmethod
def create_from_deployment(
cls, deployment: SeldonDeployment
) -> "SeldonDeploymentService":
"""Recreate a Seldon Core service from a Seldon Core deployment resource.
It should then update their operational status.
Args:
deployment: the Seldon Core deployment resource.
Returns:
The Seldon Core service corresponding to the given
Seldon Core deployment resource.
Raises:
ValueError: if the given deployment resource does not contain
the expected service_uuid label.
"""
config = SeldonDeploymentConfig.create_from_deployment(deployment)
uuid = deployment.metadata.labels.get("zenml.service_uuid")
if not uuid:
raise ValueError(
f"The given deployment resource does not contain a valid "
f"'zenml.service_uuid' label: {deployment}"
)
service = cls(uuid=UUID(uuid), config=config)
service.update_status()
return service
deprovision(self, force=False)
Deprovision the remote Seldon Core deployment instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
force |
bool |
if True, the remote deployment instance will be forcefully deprovisioned. |
False |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def deprovision(self, force: bool = False) -> None:
"""Deprovision the remote Seldon Core deployment instance.
Args:
force: if True, the remote deployment instance will be
forcefully deprovisioned.
"""
client = self._get_client()
name = self.seldon_deployment_name
try:
client.delete_deployment(name=name, force=force)
except SeldonDeploymentNotFoundError:
pass
get_logs(self, follow=False, tail=None)
Get the logs of a Seldon Core model deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def get_logs(
self,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Get the logs of a Seldon Core model deployment.
Args:
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
return self._get_client().get_deployment_logs(
self.seldon_deployment_name,
follow=follow,
tail=tail,
)
predict(self, request)
Make a prediction using the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
str |
a numpy array representing the request |
required |
Returns:
Type | Description |
---|---|
Any |
A numpy array representing the prediction returned by the service. |
Exceptions:
Type | Description |
---|---|
Exception |
if the service is not yet ready. |
ValueError |
if the prediction_url is not set. |
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def predict(self, request: str) -> Any:
"""Make a prediction using the service.
Args:
request: a numpy array representing the request
Returns:
A numpy array representing the prediction returned by the service.
Raises:
Exception: if the service is not yet ready.
ValueError: if the prediction_url is not set.
"""
if not self.is_running:
raise Exception(
"Seldon prediction service is not running. "
"Please start the service before making predictions."
)
if self.prediction_url is None:
raise ValueError("`self.prediction_url` is not set, cannot post.")
if isinstance(request, str):
request = json.loads(request)
else:
raise ValueError("Request must be a json string.")
response = requests.post(
self.prediction_url,
json={"data": {"ndarray": request}},
)
response.raise_for_status()
return response.json()
provision(self)
Provision or update remote Seldon Core deployment instance.
This should then match the current configuration.
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def provision(self) -> None:
"""Provision or update remote Seldon Core deployment instance.
This should then match the current configuration.
"""
client = self._get_client()
name = self.seldon_deployment_name
deployment = SeldonDeployment.build(
name=name,
model_uri=self.config.model_uri,
model_name=self.config.model_name,
implementation=self.config.implementation,
secret_name=self.config.secret_name,
labels=self._get_seldon_deployment_labels(),
annotations=self.config.get_seldon_deployment_annotations(),
is_custom_deployment=self.config.is_custom_deployment,
spec=self.config.spec,
)
deployment.spec.replicas = self.config.replicas
deployment.spec.predictors[0].replicas = self.config.replicas
# check if the Seldon deployment already exists
try:
client.get_deployment(name=name)
# update the existing deployment
client.update_deployment(deployment)
except SeldonDeploymentNotFoundError:
# create the deployment
client.create_deployment(deployment=deployment)
SeldonDeploymentServiceStatus (ServiceStatus)
pydantic-model
Seldon Core deployment service status.
Source code in zenml/integrations/seldon/services/seldon_deployment.py
class SeldonDeploymentServiceStatus(ServiceStatus):
"""Seldon Core deployment service status."""
steps
special
Initialization for Seldon steps.
seldon_deployer
Implementation of the Seldon Deployer step.
CustomDeployParameters (BaseModel)
pydantic-model
Custom model deployer step extra parameters.
Attributes:
Name | Type | Description |
---|---|---|
predict_function |
str |
Path to Python file containing predict function. |
Exceptions:
Type | Description |
---|---|
ValueError |
If predict_function is not specified. |
TypeError |
If predict_function is not a callable function. |
Returns:
Type | Description |
---|---|
predict_function |
Path to Python file containing predict function. |
Source code in zenml/integrations/seldon/steps/seldon_deployer.py
class CustomDeployParameters(BaseModel):
"""Custom model deployer step extra parameters.
Attributes:
predict_function: Path to Python file containing predict function.
Raises:
ValueError: If predict_function is not specified.
TypeError: If predict_function is not a callable function.
Returns:
predict_function: Path to Python file containing predict function.
"""
predict_function: str
@validator("predict_function")
def predict_function_validate(cls, predict_func_path: str) -> str:
"""Validate predict function.
Args:
predict_func_path: predict function path
Returns:
predict function path
Raises:
ValueError: if predict function path is not valid
TypeError: if predict function path is not a callable function
"""
try:
predict_function = import_class_by_path(predict_func_path)
except AttributeError:
raise ValueError("Predict function can't be found.")
if not callable(predict_function):
raise TypeError("Predict function must be callable.")
return predict_func_path
predict_function_validate(predict_func_path)
classmethod
Validate predict function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
predict_func_path |
str |
predict function path |
required |
Returns:
Type | Description |
---|---|
str |
predict function path |
Exceptions:
Type | Description |
---|---|
ValueError |
if predict function path is not valid |
TypeError |
if predict function path is not a callable function |
Source code in zenml/integrations/seldon/steps/seldon_deployer.py
@validator("predict_function")
def predict_function_validate(cls, predict_func_path: str) -> str:
"""Validate predict function.
Args:
predict_func_path: predict function path
Returns:
predict function path
Raises:
ValueError: if predict function path is not valid
TypeError: if predict function path is not a callable function
"""
try:
predict_function = import_class_by_path(predict_func_path)
except AttributeError:
raise ValueError("Predict function can't be found.")
if not callable(predict_function):
raise TypeError("Predict function must be callable.")
return predict_func_path
SeldonDeployerStepParameters (BaseParameters)
pydantic-model
Seldon model deployer step parameters.
Attributes:
Name | Type | Description |
---|---|---|
service_config |
SeldonDeploymentConfig |
Seldon Core deployment service configuration. |
secrets |
a list of ZenML secrets containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store where the models are stored). If supplied, the information fetched from these secrets is passed to the Seldon Core deployment server as a list of environment variables. |
Source code in zenml/integrations/seldon/steps/seldon_deployer.py
class SeldonDeployerStepParameters(BaseParameters):
"""Seldon model deployer step parameters.
Attributes:
service_config: Seldon Core deployment service configuration.
secrets: a list of ZenML secrets containing additional configuration
parameters for the Seldon Core deployment (e.g. credentials to
access the Artifact Store where the models are stored). If supplied,
the information fetched from these secrets is passed to the Seldon
Core deployment server as a list of environment variables.
"""
service_config: SeldonDeploymentConfig
custom_deploy_parameters: Optional[CustomDeployParameters] = None
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
seldon_custom_model_deployer_step (BaseStep)
Seldon Core custom model deployer pipeline step.
This step can be used in a pipeline to implement the the process required to deploy a custom model with Seldon Core.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
whether to deploy the model or not |
required | |
params |
parameters for the deployer step |
required | |
model |
the model artifact to deploy |
required | |
context |
the step context |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the custom deployer is not defined |
DoesNotExistException |
if an entity does not exist raise an exception |
Returns:
Type | Description |
---|---|
Seldon Core deployment service |
PARAMETERS_CLASS (BaseParameters)
pydantic-model
Seldon model deployer step parameters.
Attributes:
Name | Type | Description |
---|---|---|
service_config |
SeldonDeploymentConfig |
Seldon Core deployment service configuration. |
secrets |
a list of ZenML secrets containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store where the models are stored). If supplied, the information fetched from these secrets is passed to the Seldon Core deployment server as a list of environment variables. |
Source code in zenml/integrations/seldon/steps/seldon_deployer.py
class SeldonDeployerStepParameters(BaseParameters):
"""Seldon model deployer step parameters.
Attributes:
service_config: Seldon Core deployment service configuration.
secrets: a list of ZenML secrets containing additional configuration
parameters for the Seldon Core deployment (e.g. credentials to
access the Artifact Store where the models are stored). If supplied,
the information fetched from these secrets is passed to the Seldon
Core deployment server as a list of environment variables.
"""
service_config: SeldonDeploymentConfig
custom_deploy_parameters: Optional[CustomDeployParameters] = None
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
entrypoint(deploy_decision, params, context, model)
staticmethod
Seldon Core custom model deployer pipeline step.
This step can be used in a pipeline to implement the the process required to deploy a custom model with Seldon Core.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
bool |
whether to deploy the model or not |
required |
params |
SeldonDeployerStepParameters |
parameters for the deployer step |
required |
model |
ModelArtifact |
the model artifact to deploy |
required |
context |
StepContext |
the step context |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
if the custom deployer is not defined |
DoesNotExistException |
if an entity does not exist raise an exception |
Returns:
Type | Description |
---|---|
SeldonDeploymentService |
Seldon Core deployment service |
Source code in zenml/integrations/seldon/steps/seldon_deployer.py
@step(enable_cache=False, extra={SELDON_CUSTOM_DEPLOYMENT: True})
def seldon_custom_model_deployer_step(
deploy_decision: bool,
params: SeldonDeployerStepParameters,
context: StepContext,
model: ModelArtifact,
) -> SeldonDeploymentService:
"""Seldon Core custom model deployer pipeline step.
This step can be used in a pipeline to implement the
the process required to deploy a custom model with Seldon Core.
Args:
deploy_decision: whether to deploy the model or not
params: parameters for the deployer step
model: the model artifact to deploy
context: the step context
Raises:
ValueError: if the custom deployer is not defined
DoesNotExistException: if an entity does not exist raise an exception
Returns:
Seldon Core deployment service
"""
# verify that a custom deployer is defined
if not params.custom_deploy_parameters:
raise ValueError(
"Custom deploy parameter is required as part of the step configuration this parameter is",
"the path of the custom predict function",
)
# get the active model deployer
model_deployer = cast(
SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
)
# get pipeline name, step name, run id
step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
pipeline_name = step_env.pipeline_name
pipeline_run_id = step_env.pipeline_run_id
step_name = step_env.step_name
# update the step configuration with the real pipeline runtime information
params.service_config.pipeline_name = pipeline_name
params.service_config.pipeline_run_id = pipeline_run_id
params.service_config.pipeline_step_name = step_name
params.service_config.is_custom_deployment = True
# fetch existing services with the same pipeline name, step name and
# model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=step_name,
model_name=params.service_config.model_name,
)
# even when the deploy decision is negative if an existing model server
# is not running for this pipeline/step, we still have to serve the
# current model, to ensure that a model server is available at all times
if not deploy_decision and existing_services:
logger.info(
f"Skipping model deployment because the model quality does not"
f" meet the criteria. Reusing the last model server deployed by step "
f"'{step_name}' and pipeline '{pipeline_name}' for model "
f"'{params.service_config.model_name}'..."
)
service = cast(SeldonDeploymentService, existing_services[0])
# even when the deployment decision is negative, we still need to start
# the previous model server if it is no longer running, to ensure that
# a model server is available at all times
if not service.is_running:
service.start(timeout=params.timeout)
return service
# entrypoint for starting Seldon microservice deployment for custom model
entrypoint_command = [
"python",
"-m",
"zenml.integrations.seldon.custom_deployer.zenml_custom_model",
"--model_name",
params.service_config.model_name,
"--predict_func",
params.custom_deploy_parameters.predict_function,
]
# verify if there is an active stack before starting the service
if not context.stack:
raise DoesNotExistException(
"No active stack is available. "
"Please make sure that you have registered and set a stack."
)
context.stack
docker_image = step_env.step_run_info.pipeline.extra[
SELDON_DOCKER_IMAGE_KEY
]
# copy the model files to new specific directory for the deployment
served_model_uri = os.path.join(context.get_output_artifact_uri(), "seldon")
fileio.makedirs(served_model_uri)
io_utils.copy_dir(model.uri, served_model_uri)
# Get the model artifact to extract information about the model
# and how it can be loaded again later in the deployment environment.
artifact = Client().zen_store.list_artifacts(artifact_uri=model.uri)
if not artifact:
raise DoesNotExistException("No artifact found at {}".format(model.uri))
# save the model artifact metadata to the YAML file and copy it to the
# deployment directory
model_metadata_file = save_model_metadata(artifact[0])
fileio.copy(
model_metadata_file,
os.path.join(served_model_uri, MODEL_METADATA_YAML_FILE_NAME),
)
# prepare the service configuration for the deployment
service_config = params.service_config.copy()
service_config.model_uri = served_model_uri
# create the specification for the custom deployment
service_config.spec = create_seldon_core_custom_spec(
model_uri=service_config.model_uri,
custom_docker_image=docker_image,
secret_name=model_deployer.kubernetes_secret_name,
command=entrypoint_command,
)
# deploy the service
service = cast(
SeldonDeploymentService,
model_deployer.deploy_model(
service_config, replace=True, timeout=params.timeout
),
)
logger.info(
f"Seldon Core deployment service started and reachable at:\n"
f" {service.prediction_url}\n"
)
return service
seldon_model_deployer_step (BaseStep)
Seldon Core model deployer pipeline step.
This step can be used in a pipeline to implement continuous deployment for a ML model with Seldon Core.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
whether to deploy the model or not |
required | |
params |
parameters for the deployer step |
required | |
model |
the model artifact to deploy |
required | |
context |
the step context |
required |
Returns:
Type | Description |
---|---|
Seldon Core deployment service |
PARAMETERS_CLASS (BaseParameters)
pydantic-model
Seldon model deployer step parameters.
Attributes:
Name | Type | Description |
---|---|---|
service_config |
SeldonDeploymentConfig |
Seldon Core deployment service configuration. |
secrets |
a list of ZenML secrets containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store where the models are stored). If supplied, the information fetched from these secrets is passed to the Seldon Core deployment server as a list of environment variables. |
Source code in zenml/integrations/seldon/steps/seldon_deployer.py
class SeldonDeployerStepParameters(BaseParameters):
"""Seldon model deployer step parameters.
Attributes:
service_config: Seldon Core deployment service configuration.
secrets: a list of ZenML secrets containing additional configuration
parameters for the Seldon Core deployment (e.g. credentials to
access the Artifact Store where the models are stored). If supplied,
the information fetched from these secrets is passed to the Seldon
Core deployment server as a list of environment variables.
"""
service_config: SeldonDeploymentConfig
custom_deploy_parameters: Optional[CustomDeployParameters] = None
timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
entrypoint(deploy_decision, params, context, model)
staticmethod
Seldon Core model deployer pipeline step.
This step can be used in a pipeline to implement continuous deployment for a ML model with Seldon Core.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
bool |
whether to deploy the model or not |
required |
params |
SeldonDeployerStepParameters |
parameters for the deployer step |
required |
model |
ModelArtifact |
the model artifact to deploy |
required |
context |
StepContext |
the step context |
required |
Returns:
Type | Description |
---|---|
SeldonDeploymentService |
Seldon Core deployment service |
Source code in zenml/integrations/seldon/steps/seldon_deployer.py
@step(enable_cache=False)
def seldon_model_deployer_step(
deploy_decision: bool,
params: SeldonDeployerStepParameters,
context: StepContext,
model: ModelArtifact,
) -> SeldonDeploymentService:
"""Seldon Core model deployer pipeline step.
This step can be used in a pipeline to implement continuous
deployment for a ML model with Seldon Core.
Args:
deploy_decision: whether to deploy the model or not
params: parameters for the deployer step
model: the model artifact to deploy
context: the step context
Returns:
Seldon Core deployment service
"""
model_deployer = cast(
SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
)
# get pipeline name, step name and run id
step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
pipeline_name = step_env.pipeline_name
pipeline_run_id = step_env.pipeline_run_id
step_name = step_env.step_name
# update the step configuration with the real pipeline runtime information
params.service_config.pipeline_name = pipeline_name
params.service_config.pipeline_run_id = pipeline_run_id
params.service_config.pipeline_step_name = step_name
def prepare_service_config(model_uri: str) -> SeldonDeploymentConfig:
"""Prepare the model files for model serving.
This creates and returns a Seldon service configuration for the model.
This function ensures that the model files are in the correct format
and file structure required by the Seldon Core server implementation
used for model serving.
Args:
model_uri: the URI of the model artifact being served
Returns:
The URL to the model ready for serving.
Raises:
RuntimeError: if the model files were not found
"""
served_model_uri = os.path.join(
context.get_output_artifact_uri(), "seldon"
)
fileio.makedirs(served_model_uri)
# TODO [ENG-773]: determine how to formalize how models are organized into
# folders and sub-folders depending on the model type/format and the
# Seldon Core protocol used to serve the model.
# TODO [ENG-791]: auto-detect built-in Seldon server implementation
# from the model artifact type
# TODO [ENG-792]: validate the model artifact type against the
# supported built-in Seldon server implementations
if params.service_config.implementation == "TENSORFLOW_SERVER":
# the TensorFlow server expects model artifacts to be
# stored in numbered subdirectories, each representing a model
# version
io_utils.copy_dir(model_uri, os.path.join(served_model_uri, "1"))
elif params.service_config.implementation == "SKLEARN_SERVER":
# the sklearn server expects model artifacts to be
# stored in a file called model.joblib
model_uri = os.path.join(model.uri, "model")
if not fileio.exists(model.uri):
raise RuntimeError(
f"Expected sklearn model artifact was not found at "
f"{model_uri}"
)
fileio.copy(
model_uri, os.path.join(served_model_uri, "model.joblib")
)
else:
# default treatment for all other server implementations is to
# simply reuse the model from the artifact store path where it
# is originally stored
served_model_uri = model_uri
service_config = params.service_config.copy()
service_config.model_uri = served_model_uri
return service_config
# fetch existing services with same pipeline name, step name and
# model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=step_name,
model_name=params.service_config.model_name,
)
# even when the deploy decision is negative, if an existing model server
# is not running for this pipeline/step, we still have to serve the
# current model, to ensure that a model server is available at all times
if not deploy_decision and existing_services:
logger.info(
f"Skipping model deployment because the model quality does not "
f"meet the criteria. Reusing last model server deployed by step "
f"'{step_name}' and pipeline '{pipeline_name}' for model "
f"'{params.service_config.model_name}'..."
)
service = cast(SeldonDeploymentService, existing_services[0])
# even when the deploy decision is negative, we still need to start
# the previous model server if it is no longer running, to ensure that
# a model server is available at all times
if not service.is_running:
service.start(timeout=params.timeout)
return service
# invoke the Seldon Core model deployer to create a new service
# or update an existing one that was previously deployed for the same
# model
service_config = prepare_service_config(model.uri)
service = cast(
SeldonDeploymentService,
model_deployer.deploy_model(
service_config, replace=True, timeout=params.timeout
),
)
logger.info(
f"Seldon deployment service started and reachable at:\n"
f" {service.prediction_url}\n"
)
return service