Skip to content

Seldon

zenml.integrations.seldon special

Initialization of the Seldon integration.

The Seldon Core integration allows you to use the Seldon Core model serving platform to implement continuous model deployment.

SeldonIntegration (Integration)

Definition of Seldon Core integration for ZenML.

Source code in zenml/integrations/seldon/__init__.py
class SeldonIntegration(Integration):
    """Definition of Seldon Core integration for ZenML."""

    NAME = SELDON
    REQUIREMENTS = [
        "kubernetes==18.20.0",
        "seldon-core==1.15.0",
    ]

    @classmethod
    def activate(cls) -> None:
        """Activate the Seldon Core integration."""
        from zenml.integrations.seldon import secret_schemas  # noqa
        from zenml.integrations.seldon import services  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Seldon Core.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.seldon.flavors import SeldonModelDeployerFlavor

        return [SeldonModelDeployerFlavor]

activate() classmethod

Activate the Seldon Core integration.

Source code in zenml/integrations/seldon/__init__.py
@classmethod
def activate(cls) -> None:
    """Activate the Seldon Core integration."""
    from zenml.integrations.seldon import secret_schemas  # noqa
    from zenml.integrations.seldon import services  # noqa

flavors() classmethod

Declare the stack component flavors for the Seldon Core.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/seldon/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Seldon Core.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.seldon.flavors import SeldonModelDeployerFlavor

    return [SeldonModelDeployerFlavor]

constants

Seldon constants.

custom_deployer special

Initialization of ZenML custom deployer.

zenml_custom_model

Implements a custom model for the Seldon integration.

ZenMLCustomModel

Custom model class for ZenML and Seldon.

This class is used to implement a custom model for the Seldon Core integration, which is used as the main entry point for custom code execution.

Attributes:

Name Type Description
name

The name of the model.

model_uri

The URI of the model.

predict_func

The predict function of the model.

Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
class ZenMLCustomModel:
    """Custom model class for ZenML and Seldon.

    This class is used to implement a custom model for the Seldon Core integration,
    which is used as the main entry point for custom code execution.

    Attributes:
        name: The name of the model.
        model_uri: The URI of the model.
        predict_func: The predict function of the model.
    """

    def __init__(
        self,
        model_name: str,
        model_uri: str,
        predict_func: str,
    ):
        """Initializes a ZenMLCustomModel object.

        Args:
            model_name: The name of the model.
            model_uri: The URI of the model.
            predict_func: The predict function of the model.
        """
        self.name = model_name
        self.model_uri = model_uri
        self.predict_func = import_class_by_path(predict_func)
        self.model = None
        self.ready = False

    def load(self) -> bool:
        """Load the model.

        This function loads the model into memory and sets the ready flag to True.
        The model is loaded using the materializer, by saving the information of
        the artifact to a file at the preparing time and loading it again at the
        prediction time by the materializer.

        Returns:
            True if the model was loaded successfully, False otherwise.

        """
        try:
            from zenml.utils.materializer_utils import load_model_from_metadata

            self.model = load_model_from_metadata(self.model_uri)
        except Exception as e:
            logger.error("Failed to load model: {}".format(e))
            return False
        self.ready = True
        return self.ready

    def predict(
        self,
        X: Array_Like,
        features_names: Optional[List[str]],
        **kwargs: Any,
    ) -> Array_Like:
        """Predict the given request.

        The main predict function of the model. This function is called by the
        Seldon Core server when a request is received. Then inside this function,
        the user-defined predict function is called.

        Args:
            X: The request to predict in a dictionary.
            features_names: The names of the features.
            **kwargs: Additional arguments.

        Returns:
            The prediction dictionary.

        Raises:
            Exception: If function could not be called.
            NotImplementedError: If the model is not ready.
            TypeError: If the request is not a dictionary.
        """
        if self.predict_func is not None:
            try:
                prediction = {"predictions": self.predict_func(self.model, X)}
            except Exception as e:
                raise Exception("Failed to predict: {}".format(e))
            if isinstance(prediction, dict):
                return prediction
            else:
                raise TypeError(
                    f"Prediction is not a dictionary. Expected dict type but got {type(prediction)}"
                )
        else:
            raise NotImplementedError("Predict function is not implemented")
__init__(self, model_name, model_uri, predict_func) special

Initializes a ZenMLCustomModel object.

Parameters:

Name Type Description Default
model_name str

The name of the model.

required
model_uri str

The URI of the model.

required
predict_func str

The predict function of the model.

required
Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
def __init__(
    self,
    model_name: str,
    model_uri: str,
    predict_func: str,
):
    """Initializes a ZenMLCustomModel object.

    Args:
        model_name: The name of the model.
        model_uri: The URI of the model.
        predict_func: The predict function of the model.
    """
    self.name = model_name
    self.model_uri = model_uri
    self.predict_func = import_class_by_path(predict_func)
    self.model = None
    self.ready = False
load(self)

Load the model.

This function loads the model into memory and sets the ready flag to True. The model is loaded using the materializer, by saving the information of the artifact to a file at the preparing time and loading it again at the prediction time by the materializer.

Returns:

Type Description
bool

True if the model was loaded successfully, False otherwise.

Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
def load(self) -> bool:
    """Load the model.

    This function loads the model into memory and sets the ready flag to True.
    The model is loaded using the materializer, by saving the information of
    the artifact to a file at the preparing time and loading it again at the
    prediction time by the materializer.

    Returns:
        True if the model was loaded successfully, False otherwise.

    """
    try:
        from zenml.utils.materializer_utils import load_model_from_metadata

        self.model = load_model_from_metadata(self.model_uri)
    except Exception as e:
        logger.error("Failed to load model: {}".format(e))
        return False
    self.ready = True
    return self.ready
predict(self, X, features_names, **kwargs)

Predict the given request.

The main predict function of the model. This function is called by the Seldon Core server when a request is received. Then inside this function, the user-defined predict function is called.

Parameters:

Name Type Description Default
X Union[numpy.ndarray, List[Any], str, bytes, Dict[str, Any]]

The request to predict in a dictionary.

required
features_names Optional[List[str]]

The names of the features.

required
**kwargs Any

Additional arguments.

{}

Returns:

Type Description
Union[numpy.ndarray, List[Any], str, bytes, Dict[str, Any]]

The prediction dictionary.

Exceptions:

Type Description
Exception

If function could not be called.

NotImplementedError

If the model is not ready.

TypeError

If the request is not a dictionary.

Source code in zenml/integrations/seldon/custom_deployer/zenml_custom_model.py
def predict(
    self,
    X: Array_Like,
    features_names: Optional[List[str]],
    **kwargs: Any,
) -> Array_Like:
    """Predict the given request.

    The main predict function of the model. This function is called by the
    Seldon Core server when a request is received. Then inside this function,
    the user-defined predict function is called.

    Args:
        X: The request to predict in a dictionary.
        features_names: The names of the features.
        **kwargs: Additional arguments.

    Returns:
        The prediction dictionary.

    Raises:
        Exception: If function could not be called.
        NotImplementedError: If the model is not ready.
        TypeError: If the request is not a dictionary.
    """
    if self.predict_func is not None:
        try:
            prediction = {"predictions": self.predict_func(self.model, X)}
        except Exception as e:
            raise Exception("Failed to predict: {}".format(e))
        if isinstance(prediction, dict):
            return prediction
        else:
            raise TypeError(
                f"Prediction is not a dictionary. Expected dict type but got {type(prediction)}"
            )
    else:
        raise NotImplementedError("Predict function is not implemented")

flavors special

Seldon integration flavors.

seldon_model_deployer_flavor

Seldon model deployer flavor.

SeldonModelDeployerConfig (BaseModelDeployerConfig) pydantic-model

Config for the Seldon Model Deployer.

Attributes:

Name Type Description
kubernetes_context Optional[str]

the Kubernetes context to use to contact the remote Seldon Core installation. If not specified, the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either a locally active context or an in-cluster Kubernetes configuration (if running inside a pod).

kubernetes_namespace Optional[str]

the Kubernetes namespace where the Seldon Core deployment servers are provisioned and managed by ZenML. If not specified, the namespace set in the current configuration is used. Depending on where the Seldon model deployer is being used, this can be either the current namespace configured in the locally active context or the namespace in the context of which the pod is running (if running inside a pod).

base_url str

the base URL of the Kubernetes ingress used to expose the Seldon Core deployment servers.

secret Optional[str]

the name of a ZenML secret containing the credentials used by Seldon Core storage initializers to authenticate to the Artifact Store (i.e. the storage backend where models are stored - see https://docs.seldon.io/projects/seldon-core/en/latest/servers/overview.html#handling-credentials).

kubernetes_secret_name Optional[str]

the name of the Kubernetes secret containing the credentials used by Seldon Core storage initializers to authenticate to the Artifact Store (i.e. the storage backend where models are stored) - This is used when the secret is not managed by ZenML and is already present in the Kubernetes cluster.

Source code in zenml/integrations/seldon/flavors/seldon_model_deployer_flavor.py
class SeldonModelDeployerConfig(BaseModelDeployerConfig):
    """Config for the Seldon Model Deployer.

    Attributes:
        kubernetes_context: the Kubernetes context to use to contact the remote
            Seldon Core installation. If not specified, the current
            configuration is used. Depending on where the Seldon model deployer
            is being used, this can be either a locally active context or an
            in-cluster Kubernetes configuration (if running inside a pod).
        kubernetes_namespace: the Kubernetes namespace where the Seldon Core
            deployment servers are provisioned and managed by ZenML. If not
            specified, the namespace set in the current configuration is used.
            Depending on where the Seldon model deployer is being used, this can
            be either the current namespace configured in the locally active
            context or the namespace in the context of which the pod is running
            (if running inside a pod).
        base_url: the base URL of the Kubernetes ingress used to expose the
            Seldon Core deployment servers.
        secret: the name of a ZenML secret containing the credentials used by
            Seldon Core storage initializers to authenticate to the Artifact
            Store (i.e. the storage backend where models are stored - see
            https://docs.seldon.io/projects/seldon-core/en/latest/servers/overview.html#handling-credentials).
        kubernetes_secret_name: the name of the Kubernetes secret containing
            the credentials used by Seldon Core storage initializers to
            authenticate to the Artifact Store (i.e. the storage backend where
            models are stored) - This is used when the secret is not managed by
            ZenML and is already present in the Kubernetes cluster.
    """

    kubernetes_context: Optional[str]  # TODO: Potential setting
    kubernetes_namespace: Optional[str]
    base_url: str  # TODO: unused?
    secret: Optional[str]
    kubernetes_secret_name: Optional[
        str
    ]  # TODO: Add full documentation section on this
SeldonModelDeployerFlavor (BaseModelDeployerFlavor)

Seldon Core model deployer flavor.

Source code in zenml/integrations/seldon/flavors/seldon_model_deployer_flavor.py
class SeldonModelDeployerFlavor(BaseModelDeployerFlavor):
    """Seldon Core model deployer flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return SELDON_MODEL_DEPLOYER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/seldon.png"

    @property
    def config_class(self) -> Type[SeldonModelDeployerConfig]:
        """Returns `SeldonModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return SeldonModelDeployerConfig

    @property
    def implementation_class(self) -> Type["SeldonModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.seldon.model_deployers import (
            SeldonModelDeployer,
        )

        return SeldonModelDeployer
config_class: Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig] property readonly

Returns SeldonModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SeldonModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SeldonModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

model_deployers special

Initialization of the Seldon Model Deployer.

seldon_model_deployer

Implementation of the Seldon Model Deployer.

SeldonModelDeployer (BaseModelDeployer)

Seldon Core model deployer stack component implementation.

Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
class SeldonModelDeployer(BaseModelDeployer):
    """Seldon Core model deployer stack component implementation."""

    NAME: ClassVar[str] = "Seldon Core"
    FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = SeldonModelDeployerFlavor

    _client: Optional[SeldonClient] = None

    @property
    def config(self) -> SeldonModelDeployerConfig:
        """Returns the `SeldonModelDeployerConfig` config.

        Returns:
            The configuration.
        """
        return cast(SeldonModelDeployerConfig, self._config)

    @property
    def validator(self) -> Optional[StackValidator]:
        """Ensures there is a container registry and image builder in the stack.

        Returns:
            A `StackValidator` instance.
        """
        return StackValidator(
            required_components={
                StackComponentType.CONTAINER_REGISTRY,
                StackComponentType.IMAGE_BUILDER,
            }
        )

    @staticmethod
    def get_model_server_info(  # type: ignore[override]
        service_instance: "SeldonDeploymentService",
    ) -> Dict[str, Optional[str]]:
        """Return implementation specific information that might be relevant to the user.

        Args:
            service_instance: Instance of a SeldonDeploymentService

        Returns:
            Model server information.
        """
        return {
            "PREDICTION_URL": service_instance.prediction_url,
            "MODEL_URI": service_instance.config.model_uri,
            "MODEL_NAME": service_instance.config.model_name,
            "SELDON_DEPLOYMENT": service_instance.seldon_deployment_name,
        }

    @property
    def seldon_client(self) -> SeldonClient:
        """Get the Seldon Core client associated with this model deployer.

        Returns:
            The Seldon Core client.
        """
        if not self._client:
            self._client = SeldonClient(
                context=self.config.kubernetes_context,
                namespace=self.config.kubernetes_namespace,
            )
        return self._client

    @property
    def kubernetes_secret_name(self) -> Optional[str]:
        """Get the Kubernetes secret name associated with this model deployer.

        If a secret is configured for this model deployer, a corresponding
        Kubernetes secret is created in the remote cluster to be used
        by Seldon Core storage initializers to authenticate to the Artifact
        Store. This method returns the unique name that is used for this secret.

        Returns:
            The Seldon Core Kubernetes secret name, or None if no secret is
            configured.
        """
        if self.config.kubernetes_secret_name and not self.config.secret:
            return self.config.kubernetes_secret_name
        elif not self.config.secret:
            return None
        return (
            re.sub(
                r"[^0-9a-zA-Z-]+",
                "-",
                f"zenml-seldon-core-{self.config.secret}",
            )
            .strip("-")
            .lower()
        )

    def get_docker_builds(
        self, deployment: "PipelineDeploymentBaseModel"
    ) -> List["BuildConfiguration"]:
        """Gets the Docker builds required for the component.

        Args:
            deployment: The pipeline deployment for which to get the builds.

        Returns:
            The required Docker builds.
        """
        builds = []
        for step_name, step in deployment.step_configurations.items():
            if step.config.extra.get(SELDON_CUSTOM_DEPLOYMENT, False) is True:
                build = BuildConfiguration(
                    key=SELDON_DOCKER_IMAGE_KEY,
                    settings=step.config.docker_settings,
                    step_name=step_name,
                )
                builds.append(build)

        return builds

    def _create_or_update_kubernetes_secret(self) -> Optional[str]:
        """Create or update a Kubernetes secret.

        Uses the information stored in the ZenML secret configured for the model deployer.

        Returns:
            The name of the Kubernetes secret that was created or updated, or
            None if no secret was configured.

        Raises:
            RuntimeError: if the secret cannot be created or updated.
        """
        # if a ZenML secret was configured in the model deployer,
        # create a Kubernetes secret as a means to pass this information
        # to the Seldon Core deployment
        if self.config.kubernetes_secret_name and not self.config.secret:
            return self.config.kubernetes_secret_name
        elif self.config.secret:

            secret_manager = Client().active_stack.secrets_manager

            if not secret_manager or not isinstance(
                secret_manager, BaseSecretsManager
            ):
                raise RuntimeError(
                    f"The active stack doesn't have a secret manager component. "
                    f"The ZenML secret specified in the Seldon Core Model "
                    f"Deployer configuration cannot be fetched: {self.config.secret}."
                )

            try:
                zenml_secret = secret_manager.get_secret(self.config.secret)
            except KeyError:
                raise RuntimeError(
                    f"The ZenML secret '{self.config.secret}' specified in the "
                    f"Seldon Core Model Deployer configuration was not found "
                    f"in the active stack's secret manager."
                )

            # should never happen, just making mypy happy
            assert self.kubernetes_secret_name is not None
            self.seldon_client.create_or_update_secret(
                self.kubernetes_secret_name, zenml_secret
            )

        return self.kubernetes_secret_name

    def _delete_kubernetes_secret(self) -> None:
        """Delete the Kubernetes secret associated with this model deployer.

        Do this if no Seldon Core deployments are using it.
        """
        if (
            self.kubernetes_secret_name
            and self.kubernetes_secret_name
            != self.config.kubernetes_secret_name
        ):

            # fetch all the Seldon Core deployments that currently
            # configured to use this secret
            services = self.find_model_server()
            for service in services:
                config = cast(SeldonDeploymentConfig, service.config)
                if config.secret_name == self.kubernetes_secret_name:
                    return
            self.seldon_client.delete_secret(self.kubernetes_secret_name)

    def deploy_model(
        self,
        config: ServiceConfig,
        replace: bool = False,
        timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Create a new Seldon Core deployment or update an existing one.

        # noqa: DAR402

        This should serve the supplied model and deployment configuration.

        This method has two modes of operation, depending on the `replace`
        argument value:

          * if `replace` is False, calling this method will create a new Seldon
            Core deployment server to reflect the model and other configuration
            parameters specified in the supplied Seldon deployment `config`.

          * if `replace` is True, this method will first attempt to find an
            existing Seldon Core deployment that is *equivalent* to the supplied
            configuration parameters. Two or more Seldon Core deployments are
            considered equivalent if they have the same `pipeline_name`,
            `pipeline_step_name` and `model_name` configuration parameters. To
            put it differently, two Seldon Core deployments are equivalent if
            they serve versions of the same model deployed by the same pipeline
            step. If an equivalent Seldon Core deployment is found, it will be
            updated in place to reflect the new configuration parameters. This
            allows an existing Seldon Core deployment to retain its prediction
            URL while performing a rolling update to serve a new model version.

        Callers should set `replace` to True if they want a continuous model
        deployment workflow that doesn't spin up a new Seldon Core deployment
        server for each new model version. If multiple equivalent Seldon Core
        deployments are found, the most recently created deployment is selected
        to be updated and the others are deleted.

        Args:
            config: the configuration of the model to be deployed with Seldon.
                Core
            replace: set this flag to True to find and update an equivalent
                Seldon Core deployment server with the new model instead of
                starting a new deployment server.
            timeout: the timeout in seconds to wait for the Seldon Core server
                to be provisioned and successfully started or updated. If set
                to 0, the method will return immediately after the Seldon Core
                server is provisioned, without waiting for it to fully start.

        Returns:
            The ZenML Seldon Core deployment service object that can be used to
            interact with the remote Seldon Core server.

        Raises:
            SeldonClientError: if a Seldon Core client error is encountered
                while provisioning the Seldon Core deployment server.
            RuntimeError: if `timeout` is set to a positive value that is
                exceeded while waiting for the Seldon Core deployment server
                to start, or if an operational failure is encountered before
                it reaches a ready state.
        """
        with event_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
            config = cast(SeldonDeploymentConfig, config)
            service = None

            # if a custom Kubernetes secret is not explicitly specified in the
            # SeldonDeploymentConfig, try to create one from the ZenML secret
            # configured for the model deployer
            config.secret_name = (
                config.secret_name
                or self._create_or_update_kubernetes_secret()
            )

            # if replace is True, find equivalent Seldon Core deployments
            if replace is True:
                equivalent_services = self.find_model_server(
                    running=False,
                    pipeline_name=config.pipeline_name,
                    pipeline_step_name=config.pipeline_step_name,
                    model_name=config.model_name,
                )

                for equivalent_service in equivalent_services:
                    if service is None:
                        # keep the most recently created service
                        service = equivalent_service
                    else:
                        try:
                            # delete the older services and don't wait for
                            # them to be deprovisioned
                            service.stop()
                        except RuntimeError:
                            # ignore errors encountered while stopping old
                            # services
                            pass

            if service:
                # update an equivalent service in place
                service.update(config)
                logger.info(
                    f"Updating an existing Seldon deployment service: {service}"
                )
            else:
                # create a new service
                service = SeldonDeploymentService(config=config)
                logger.info(
                    f"Creating a new Seldon deployment service: {service}"
                )

            # start the service which in turn provisions the Seldon Core
            # deployment server and waits for it to reach a ready state
            service.start(timeout=timeout)

            # Add telemetry with metadata that gets the stack metadata and
            # differentiates between pure model and custom code deployments
            stack = Client().active_stack
            stack_metadata = {
                component_type.value: component.flavor
                for component_type, component in stack.components.items()
            }
            analytics_handler.metadata = {
                "store_type": Client().zen_store.type.value,
                **stack_metadata,
                "is_custom_code_deployment": config.is_custom_deployment,
            }

        return service

    def find_model_server(
        self,
        running: bool = False,
        service_uuid: Optional[UUID] = None,
        pipeline_name: Optional[str] = None,
        pipeline_run_id: Optional[str] = None,
        pipeline_step_name: Optional[str] = None,
        model_name: Optional[str] = None,
        model_uri: Optional[str] = None,
        model_type: Optional[str] = None,
    ) -> List[BaseService]:
        """Find one or more Seldon Core model services that match the given criteria.

        The Seldon Core deployment services that meet the search criteria are
        returned sorted in descending order of their creation time (i.e. more
        recent deployments first).

        Args:
            running: if true, only running services will be returned.
            service_uuid: the UUID of the Seldon Core service that was
                originally used to create the Seldon Core deployment resource.
            pipeline_name: name of the pipeline that the deployed model was part
                of.
            pipeline_run_id: ID of the pipeline run which the deployed model was
                part of.
            pipeline_step_name: the name of the pipeline model deployment step
                that deployed the model.
            model_name: the name of the deployed model.
            model_uri: URI of the deployed model.
            model_type: the Seldon Core server implementation used to serve
                the model

        Returns:
            One or more Seldon Core service objects representing Seldon Core
            model servers that match the input search criteria.
        """
        # Use a Seldon deployment service configuration to compute the labels
        config = SeldonDeploymentConfig(
            pipeline_name=pipeline_name or "",
            pipeline_run_id=pipeline_run_id or "",
            pipeline_step_name=pipeline_step_name or "",
            model_name=model_name or "",
            model_uri=model_uri or "",
            implementation=model_type or "",
        )
        labels = config.get_seldon_deployment_labels()
        if service_uuid:
            # the service UUID is not a label covered by the Seldon
            # deployment service configuration, so we need to add it
            # separately
            labels["zenml.service_uuid"] = str(service_uuid)

        deployments = self.seldon_client.find_deployments(labels=labels)
        # sort the deployments in descending order of their creation time
        deployments.sort(
            key=lambda deployment: datetime.strptime(
                deployment.metadata.creationTimestamp,
                "%Y-%m-%dT%H:%M:%SZ",
            )
            if deployment.metadata.creationTimestamp
            else datetime.min,
            reverse=True,
        )

        services: List[BaseService] = []
        for deployment in deployments:
            # recreate the Seldon deployment service object from the Seldon
            # deployment resource
            service = SeldonDeploymentService.create_from_deployment(
                deployment=deployment
            )
            if running and not service.is_running:
                # skip non-running services
                continue
            services.append(service)

        return services

    def stop_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Stop a Seldon Core model server.

        Args:
            uuid: UUID of the model server to stop.
            timeout: timeout in seconds to wait for the service to stop.
            force: if True, force the service to stop.

        Raises:
            NotImplementedError: stopping Seldon Core model servers is not
                supported.
        """
        raise NotImplementedError(
            "Stopping Seldon Core model servers is not implemented. Try "
            "deleting the Seldon Core model server instead."
        )

    def start_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> None:
        """Start a Seldon Core model deployment server.

        Args:
            uuid: UUID of the model server to start.
            timeout: timeout in seconds to wait for the service to become
                active. . If set to 0, the method will return immediately after
                provisioning the service, without waiting for it to become
                active.

        Raises:
            NotImplementedError: since we don't support starting Seldon Core
                model servers
        """
        raise NotImplementedError(
            "Starting Seldon Core model servers is not implemented"
        )

    def delete_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Delete a Seldon Core model deployment server.

        Args:
            uuid: UUID of the model server to delete.
            timeout: timeout in seconds to wait for the service to stop. If
                set to 0, the method will return immediately after
                deprovisioning the service, without waiting for it to stop.
            force: if True, force the service to stop.
        """
        services = self.find_model_server(service_uuid=uuid)
        if len(services) == 0:
            return
        services[0].stop(timeout=timeout, force=force)

        # if this is the last Seldon Core model server, delete the Kubernetes
        # secret used to store the authentication information for the Seldon
        # Core model server storage initializer
        self._delete_kubernetes_secret()
config: SeldonModelDeployerConfig property readonly

Returns the SeldonModelDeployerConfig config.

Returns:

Type Description
SeldonModelDeployerConfig

The configuration.

kubernetes_secret_name: Optional[str] property readonly

Get the Kubernetes secret name associated with this model deployer.

If a secret is configured for this model deployer, a corresponding Kubernetes secret is created in the remote cluster to be used by Seldon Core storage initializers to authenticate to the Artifact Store. This method returns the unique name that is used for this secret.

Returns:

Type Description
Optional[str]

The Seldon Core Kubernetes secret name, or None if no secret is configured.

seldon_client: SeldonClient property readonly

Get the Seldon Core client associated with this model deployer.

Returns:

Type Description
SeldonClient

The Seldon Core client.

validator: Optional[zenml.stack.stack_validator.StackValidator] property readonly

Ensures there is a container registry and image builder in the stack.

Returns:

Type Description
Optional[zenml.stack.stack_validator.StackValidator]

A StackValidator instance.

FLAVOR (BaseModelDeployerFlavor)

Seldon Core model deployer flavor.

Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
class SeldonModelDeployerFlavor(BaseModelDeployerFlavor):
    """Seldon Core model deployer flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return SELDON_MODEL_DEPLOYER_FLAVOR

    @property
    def docs_url(self) -> Optional[str]:
        """A url to point at docs explaining this flavor.

        Returns:
            A flavor docs url.
        """
        return self.generate_default_docs_url()

    @property
    def sdk_docs_url(self) -> Optional[str]:
        """A url to point at SDK docs explaining this flavor.

        Returns:
            A flavor SDK docs url.
        """
        return self.generate_default_sdk_docs_url()

    @property
    def logo_url(self) -> str:
        """A url to represent the flavor in the dashboard.

        Returns:
            The flavor logo.
        """
        return "https://public-flavor-logos.s3.eu-central-1.amazonaws.com/model_deployer/seldon.png"

    @property
    def config_class(self) -> Type[SeldonModelDeployerConfig]:
        """Returns `SeldonModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return SeldonModelDeployerConfig

    @property
    def implementation_class(self) -> Type["SeldonModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.seldon.model_deployers import (
            SeldonModelDeployer,
        )

        return SeldonModelDeployer
config_class: Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig] property readonly

Returns SeldonModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.seldon.flavors.seldon_model_deployer_flavor.SeldonModelDeployerConfig]

The config class.

docs_url: Optional[str] property readonly

A url to point at docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor docs url.

implementation_class: Type[SeldonModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[SeldonModelDeployer]

The implementation class.

logo_url: str property readonly

A url to represent the flavor in the dashboard.

Returns:

Type Description
str

The flavor logo.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.

sdk_docs_url: Optional[str] property readonly

A url to point at SDK docs explaining this flavor.

Returns:

Type Description
Optional[str]

A flavor SDK docs url.

delete_model_server(self, uuid, timeout=300, force=False)

Delete a Seldon Core model deployment server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to delete.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

300
force bool

if True, force the service to stop.

False
Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def delete_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Delete a Seldon Core model deployment server.

    Args:
        uuid: UUID of the model server to delete.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.
    """
    services = self.find_model_server(service_uuid=uuid)
    if len(services) == 0:
        return
    services[0].stop(timeout=timeout, force=force)

    # if this is the last Seldon Core model server, delete the Kubernetes
    # secret used to store the authentication information for the Seldon
    # Core model server storage initializer
    self._delete_kubernetes_secret()
deploy_model(self, config, replace=False, timeout=300)

Create a new Seldon Core deployment or update an existing one.

noqa: DAR402

This should serve the supplied model and deployment configuration.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new Seldon Core deployment server to reflect the model and other configuration parameters specified in the supplied Seldon deployment config.

  • if replace is True, this method will first attempt to find an existing Seldon Core deployment that is equivalent to the supplied configuration parameters. Two or more Seldon Core deployments are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two Seldon Core deployments are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent Seldon Core deployment is found, it will be updated in place to reflect the new configuration parameters. This allows an existing Seldon Core deployment to retain its prediction URL while performing a rolling update to serve a new model version.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new Seldon Core deployment server for each new model version. If multiple equivalent Seldon Core deployments are found, the most recently created deployment is selected to be updated and the others are deleted.

Parameters:

Name Type Description Default
config ServiceConfig

the configuration of the model to be deployed with Seldon. Core

required
replace bool

set this flag to True to find and update an equivalent Seldon Core deployment server with the new model instead of starting a new deployment server.

False
timeout int

the timeout in seconds to wait for the Seldon Core server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the Seldon Core server is provisioned, without waiting for it to fully start.

300

Returns:

Type Description
BaseService

The ZenML Seldon Core deployment service object that can be used to interact with the remote Seldon Core server.

Exceptions:

Type Description
SeldonClientError

if a Seldon Core client error is encountered while provisioning the Seldon Core deployment server.

RuntimeError

if timeout is set to a positive value that is exceeded while waiting for the Seldon Core deployment server to start, or if an operational failure is encountered before it reaches a ready state.

Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def deploy_model(
    self,
    config: ServiceConfig,
    replace: bool = False,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new Seldon Core deployment or update an existing one.

    # noqa: DAR402

    This should serve the supplied model and deployment configuration.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new Seldon
        Core deployment server to reflect the model and other configuration
        parameters specified in the supplied Seldon deployment `config`.

      * if `replace` is True, this method will first attempt to find an
        existing Seldon Core deployment that is *equivalent* to the supplied
        configuration parameters. Two or more Seldon Core deployments are
        considered equivalent if they have the same `pipeline_name`,
        `pipeline_step_name` and `model_name` configuration parameters. To
        put it differently, two Seldon Core deployments are equivalent if
        they serve versions of the same model deployed by the same pipeline
        step. If an equivalent Seldon Core deployment is found, it will be
        updated in place to reflect the new configuration parameters. This
        allows an existing Seldon Core deployment to retain its prediction
        URL while performing a rolling update to serve a new model version.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new Seldon Core deployment
    server for each new model version. If multiple equivalent Seldon Core
    deployments are found, the most recently created deployment is selected
    to be updated and the others are deleted.

    Args:
        config: the configuration of the model to be deployed with Seldon.
            Core
        replace: set this flag to True to find and update an equivalent
            Seldon Core deployment server with the new model instead of
            starting a new deployment server.
        timeout: the timeout in seconds to wait for the Seldon Core server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the Seldon Core
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML Seldon Core deployment service object that can be used to
        interact with the remote Seldon Core server.

    Raises:
        SeldonClientError: if a Seldon Core client error is encountered
            while provisioning the Seldon Core deployment server.
        RuntimeError: if `timeout` is set to a positive value that is
            exceeded while waiting for the Seldon Core deployment server
            to start, or if an operational failure is encountered before
            it reaches a ready state.
    """
    with event_handler(AnalyticsEvent.MODEL_DEPLOYED) as analytics_handler:
        config = cast(SeldonDeploymentConfig, config)
        service = None

        # if a custom Kubernetes secret is not explicitly specified in the
        # SeldonDeploymentConfig, try to create one from the ZenML secret
        # configured for the model deployer
        config.secret_name = (
            config.secret_name
            or self._create_or_update_kubernetes_secret()
        )

        # if replace is True, find equivalent Seldon Core deployments
        if replace is True:
            equivalent_services = self.find_model_server(
                running=False,
                pipeline_name=config.pipeline_name,
                pipeline_step_name=config.pipeline_step_name,
                model_name=config.model_name,
            )

            for equivalent_service in equivalent_services:
                if service is None:
                    # keep the most recently created service
                    service = equivalent_service
                else:
                    try:
                        # delete the older services and don't wait for
                        # them to be deprovisioned
                        service.stop()
                    except RuntimeError:
                        # ignore errors encountered while stopping old
                        # services
                        pass

        if service:
            # update an equivalent service in place
            service.update(config)
            logger.info(
                f"Updating an existing Seldon deployment service: {service}"
            )
        else:
            # create a new service
            service = SeldonDeploymentService(config=config)
            logger.info(
                f"Creating a new Seldon deployment service: {service}"
            )

        # start the service which in turn provisions the Seldon Core
        # deployment server and waits for it to reach a ready state
        service.start(timeout=timeout)

        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        analytics_handler.metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "is_custom_code_deployment": config.is_custom_deployment,
        }

    return service
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, pipeline_run_id=None, pipeline_step_name=None, model_name=None, model_uri=None, model_type=None)

Find one or more Seldon Core model services that match the given criteria.

The Seldon Core deployment services that meet the search criteria are returned sorted in descending order of their creation time (i.e. more recent deployments first).

Parameters:

Name Type Description Default
running bool

if true, only running services will be returned.

False
service_uuid Optional[uuid.UUID]

the UUID of the Seldon Core service that was originally used to create the Seldon Core deployment resource.

None
pipeline_name Optional[str]

name of the pipeline that the deployed model was part of.

None
pipeline_run_id Optional[str]

ID of the pipeline run which the deployed model was part of.

None
pipeline_step_name Optional[str]

the name of the pipeline model deployment step that deployed the model.

None
model_name Optional[str]

the name of the deployed model.

None
model_uri Optional[str]

URI of the deployed model.

None
model_type Optional[str]

the Seldon Core server implementation used to serve the model

None

Returns:

Type Description
List[zenml.services.service.BaseService]

One or more Seldon Core service objects representing Seldon Core model servers that match the input search criteria.

Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def find_model_server(
    self,
    running: bool = False,
    service_uuid: Optional[UUID] = None,
    pipeline_name: Optional[str] = None,
    pipeline_run_id: Optional[str] = None,
    pipeline_step_name: Optional[str] = None,
    model_name: Optional[str] = None,
    model_uri: Optional[str] = None,
    model_type: Optional[str] = None,
) -> List[BaseService]:
    """Find one or more Seldon Core model services that match the given criteria.

    The Seldon Core deployment services that meet the search criteria are
    returned sorted in descending order of their creation time (i.e. more
    recent deployments first).

    Args:
        running: if true, only running services will be returned.
        service_uuid: the UUID of the Seldon Core service that was
            originally used to create the Seldon Core deployment resource.
        pipeline_name: name of the pipeline that the deployed model was part
            of.
        pipeline_run_id: ID of the pipeline run which the deployed model was
            part of.
        pipeline_step_name: the name of the pipeline model deployment step
            that deployed the model.
        model_name: the name of the deployed model.
        model_uri: URI of the deployed model.
        model_type: the Seldon Core server implementation used to serve
            the model

    Returns:
        One or more Seldon Core service objects representing Seldon Core
        model servers that match the input search criteria.
    """
    # Use a Seldon deployment service configuration to compute the labels
    config = SeldonDeploymentConfig(
        pipeline_name=pipeline_name or "",
        pipeline_run_id=pipeline_run_id or "",
        pipeline_step_name=pipeline_step_name or "",
        model_name=model_name or "",
        model_uri=model_uri or "",
        implementation=model_type or "",
    )
    labels = config.get_seldon_deployment_labels()
    if service_uuid:
        # the service UUID is not a label covered by the Seldon
        # deployment service configuration, so we need to add it
        # separately
        labels["zenml.service_uuid"] = str(service_uuid)

    deployments = self.seldon_client.find_deployments(labels=labels)
    # sort the deployments in descending order of their creation time
    deployments.sort(
        key=lambda deployment: datetime.strptime(
            deployment.metadata.creationTimestamp,
            "%Y-%m-%dT%H:%M:%SZ",
        )
        if deployment.metadata.creationTimestamp
        else datetime.min,
        reverse=True,
    )

    services: List[BaseService] = []
    for deployment in deployments:
        # recreate the Seldon deployment service object from the Seldon
        # deployment resource
        service = SeldonDeploymentService.create_from_deployment(
            deployment=deployment
        )
        if running and not service.is_running:
            # skip non-running services
            continue
        services.append(service)

    return services
get_docker_builds(self, deployment)

Gets the Docker builds required for the component.

Parameters:

Name Type Description Default
deployment PipelineDeploymentBaseModel

The pipeline deployment for which to get the builds.

required

Returns:

Type Description
List[BuildConfiguration]

The required Docker builds.

Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def get_docker_builds(
    self, deployment: "PipelineDeploymentBaseModel"
) -> List["BuildConfiguration"]:
    """Gets the Docker builds required for the component.

    Args:
        deployment: The pipeline deployment for which to get the builds.

    Returns:
        The required Docker builds.
    """
    builds = []
    for step_name, step in deployment.step_configurations.items():
        if step.config.extra.get(SELDON_CUSTOM_DEPLOYMENT, False) is True:
            build = BuildConfiguration(
                key=SELDON_DOCKER_IMAGE_KEY,
                settings=step.config.docker_settings,
                step_name=step_name,
            )
            builds.append(build)

    return builds
get_model_server_info(service_instance) staticmethod

Return implementation specific information that might be relevant to the user.

Parameters:

Name Type Description Default
service_instance SeldonDeploymentService

Instance of a SeldonDeploymentService

required

Returns:

Type Description
Dict[str, Optional[str]]

Model server information.

Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "SeldonDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information that might be relevant to the user.

    Args:
        service_instance: Instance of a SeldonDeploymentService

    Returns:
        Model server information.
    """
    return {
        "PREDICTION_URL": service_instance.prediction_url,
        "MODEL_URI": service_instance.config.model_uri,
        "MODEL_NAME": service_instance.config.model_name,
        "SELDON_DEPLOYMENT": service_instance.seldon_deployment_name,
    }
start_model_server(self, uuid, timeout=300)

Start a Seldon Core model deployment server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to start.

required
timeout int

timeout in seconds to wait for the service to become active. . If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active.

300

Exceptions:

Type Description
NotImplementedError

since we don't support starting Seldon Core model servers

Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def start_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
    """Start a Seldon Core model deployment server.

    Args:
        uuid: UUID of the model server to start.
        timeout: timeout in seconds to wait for the service to become
            active. . If set to 0, the method will return immediately after
            provisioning the service, without waiting for it to become
            active.

    Raises:
        NotImplementedError: since we don't support starting Seldon Core
            model servers
    """
    raise NotImplementedError(
        "Starting Seldon Core model servers is not implemented"
    )
stop_model_server(self, uuid, timeout=300, force=False)

Stop a Seldon Core model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to stop.

required
timeout int

timeout in seconds to wait for the service to stop.

300
force bool

if True, force the service to stop.

False

Exceptions:

Type Description
NotImplementedError

stopping Seldon Core model servers is not supported.

Source code in zenml/integrations/seldon/model_deployers/seldon_model_deployer.py
def stop_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Stop a Seldon Core model server.

    Args:
        uuid: UUID of the model server to stop.
        timeout: timeout in seconds to wait for the service to stop.
        force: if True, force the service to stop.

    Raises:
        NotImplementedError: stopping Seldon Core model servers is not
            supported.
    """
    raise NotImplementedError(
        "Stopping Seldon Core model servers is not implemented. Try "
        "deleting the Seldon Core model server instead."
    )

secret_schemas special

Initialization for the Seldon secret schemas.

These are secret schemas that can be used to authenticate Seldon to the Artifact Store used to store served ML models.

secret_schemas

Implementation for Seldon secret schemas.

SeldonAzureSecretSchema (BaseSecretSchema) pydantic-model

Seldon Azure Blob Storage credentials.

Based on: https://rclone.org/azureblob/

Attributes:

Name Type Description
rclone_config_azureblob_type Literal['azureblob']

the rclone config type. Must be set to "azureblob" for this schema.

rclone_config_azureblob_account Optional[str]

storage Account Name. Leave blank to use SAS URL or MSI.

rclone_config_azureblob_key Optional[str]

storage Account Key. Leave blank to use SAS URL or MSI.

rclone_config_azureblob_sas_url Optional[str]

SAS URL for container level access only. Leave blank if using account/key or MSI.

rclone_config_azureblob_use_msi bool

use a managed service identity to authenticate (only works in Azure).

Source code in zenml/integrations/seldon/secret_schemas/secret_schemas.py
class SeldonAzureSecretSchema(BaseSecretSchema):
    """Seldon Azure Blob Storage credentials.

    Based on: https://rclone.org/azureblob/

    Attributes:
        rclone_config_azureblob_type: the rclone config type. Must be set to
            "azureblob" for this schema.
        rclone_config_azureblob_account: storage Account Name. Leave blank to
            use SAS URL or MSI.
        rclone_config_azureblob_key: storage Account Key. Leave blank to
            use SAS URL or MSI.
        rclone_config_azureblob_sas_url: SAS URL for container level access
            only. Leave blank if using account/key or MSI.
        rclone_config_azureblob_use_msi: use a managed service identity to
            authenticate (only works in Azure).
    """

    TYPE: ClassVar[str] = SELDON_AZUREBLOB_SECRET_SCHEMA_TYPE

    rclone_config_azureblob_type: Literal["azureblob"] = "azureblob"
    rclone_config_azureblob_account: Optional[str]
    rclone_config_azureblob_key: Optional[str]
    rclone_config_azureblob_sas_url: Optional[str]
    rclone_config_azureblob_use_msi: bool = False
SeldonGSSecretSchema (BaseSecretSchema) pydantic-model

Seldon GCS credentials.

Based on: https://rclone.org/googlecloudstorage/

Attributes:

Name Type Description
rclone_config_gs_type Literal['google cloud storage']

the rclone config type. Must be set to "google cloud storage" for this schema.

rclone_config_gs_client_id Optional[str]

OAuth client id.

rclone_config_gs_client_secret Optional[str]

OAuth client secret.

rclone_config_gs_token Optional[str]

OAuth Access Token as a JSON blob.

rclone_config_gs_project_number Optional[str]

project number.

rclone_config_gs_service_account_credentials Optional[str]

service account credentials JSON blob.

rclone_config_gs_anonymous bool

access public buckets and objects without credentials. Set to True if you just want to download files and don't configure credentials.

rclone_config_gs_auth_url Optional[str]

auth server URL.

Source code in zenml/integrations/seldon/secret_schemas/secret_schemas.py
class SeldonGSSecretSchema(BaseSecretSchema):
    """Seldon GCS credentials.

    Based on: https://rclone.org/googlecloudstorage/

    Attributes:
        rclone_config_gs_type: the rclone config type. Must be set to "google
            cloud storage" for this schema.
        rclone_config_gs_client_id: OAuth client id.
        rclone_config_gs_client_secret: OAuth client secret.
        rclone_config_gs_token: OAuth Access Token as a JSON blob.
        rclone_config_gs_project_number: project number.
        rclone_config_gs_service_account_credentials: service account
            credentials JSON blob.
        rclone_config_gs_anonymous: access public buckets and objects without
            credentials. Set to True if you just want to download files and
            don't configure credentials.
        rclone_config_gs_auth_url: auth server URL.
    """

    TYPE: ClassVar[str] = SELDON_GS_SECRET_SCHEMA_TYPE

    rclone_config_gs_type: Literal[
        "google cloud storage"
    ] = "google cloud storage"
    rclone_config_gs_client_id: Optional[str]
    rclone_config_gs_client_secret: Optional[str]
    rclone_config_gs_project_number: Optional[str]
    rclone_config_gs_service_account_credentials: Optional[str]
    rclone_config_gs_anonymous: bool = False
    rclone_config_gs_token: Optional[str]
    rclone_config_gs_auth_url: Optional[str]
    rclone_config_gs_token_url: Optional[str]
SeldonS3SecretSchema (BaseSecretSchema) pydantic-model

Seldon S3 credentials.

Based on: https://rclone.org/s3/#amazon-s3

Attributes:

Name Type Description
rclone_config_s3_type Literal['s3']

the rclone config type. Must be set to "s3" for this schema.

rclone_config_s3_provider str

the S3 provider (e.g. aws, ceph, minio).

rclone_config_s3_env_auth bool

get AWS credentials from EC2/ECS meta data (i.e. with IAM roles configuration). Only applies if access_key_id and secret_access_key are blank.

rclone_config_s3_access_key_id Optional[str]

AWS Access Key ID.

rclone_config_s3_secret_access_key Optional[str]

AWS Secret Access Key.

rclone_config_s3_session_token Optional[str]

AWS Session Token.

rclone_config_s3_region Optional[str]

region to connect to.

rclone_config_s3_endpoint Optional[str]

S3 API endpoint.

Source code in zenml/integrations/seldon/secret_schemas/secret_schemas.py
class SeldonS3SecretSchema(BaseSecretSchema):
    """Seldon S3 credentials.

    Based on: https://rclone.org/s3/#amazon-s3

    Attributes:
        rclone_config_s3_type: the rclone config type. Must be set to "s3" for
            this schema.
        rclone_config_s3_provider: the S3 provider (e.g. aws, ceph, minio).
        rclone_config_s3_env_auth: get AWS credentials from EC2/ECS meta data
            (i.e. with IAM roles configuration). Only applies if access_key_id
            and secret_access_key are blank.
        rclone_config_s3_access_key_id: AWS Access Key ID.
        rclone_config_s3_secret_access_key: AWS Secret Access Key.
        rclone_config_s3_session_token: AWS Session Token.
        rclone_config_s3_region: region to connect to.
        rclone_config_s3_endpoint: S3 API endpoint.

    """

    TYPE: ClassVar[str] = SELDON_S3_SECRET_SCHEMA_TYPE

    rclone_config_s3_type: Literal["s3"] = "s3"
    rclone_config_s3_provider: str = "aws"
    rclone_config_s3_env_auth: bool = False
    rclone_config_s3_access_key_id: Optional[str]
    rclone_config_s3_secret_access_key: Optional[str]
    rclone_config_s3_session_token: Optional[str]
    rclone_config_s3_region: Optional[str]
    rclone_config_s3_endpoint: Optional[str]

seldon_client

Implementation of the Seldon client for ZenML.

SeldonClient

A client for interacting with Seldon Deployments.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonClient:
    """A client for interacting with Seldon Deployments."""

    def __init__(self, context: Optional[str], namespace: Optional[str]):
        """Initialize a Seldon Core client.

        Args:
            context: the Kubernetes context to use.
            namespace: the Kubernetes namespace to use.
        """
        self._context = context
        self._namespace = namespace
        self._initialize_k8s_clients()

    def _initialize_k8s_clients(self) -> None:
        """Initialize the Kubernetes clients.

        Raises:
            SeldonClientError: if Kubernetes configuration could not be loaded
        """
        try:
            k8s_config.load_incluster_config()
            if not self._namespace:
                # load the namespace in the context of which the
                # current pod is running
                self._namespace = open(
                    "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
                ).read()
        except k8s_config.config_exception.ConfigException:
            if not self._namespace:
                raise SeldonClientError(
                    "The Kubernetes namespace must be explicitly "
                    "configured when running outside of a cluster."
                )
            try:
                k8s_config.load_kube_config(
                    context=self._context, persist_config=False
                )
            except k8s_config.config_exception.ConfigException as e:
                raise SeldonClientError(
                    "Could not load the Kubernetes configuration"
                ) from e
        self._core_api = k8s_client.CoreV1Api()
        self._custom_objects_api = k8s_client.CustomObjectsApi()

    @staticmethod
    def sanitize_labels(labels: Dict[str, str]) -> None:
        """Update the label values to be valid Kubernetes labels.

        See:
        https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

        Args:
            labels: the labels to sanitize.
        """
        for key, value in labels.items():
            # Kubernetes labels must be alphanumeric, no longer than
            # 63 characters, and must begin and end with an alphanumeric
            # character ([a-z0-9A-Z])
            labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
                "-_."
            )

    @property
    def namespace(self) -> str:
        """Returns the Kubernetes namespace in use by the client.

        Returns:
            The Kubernetes namespace in use by the client.

        Raises:
            RuntimeError: if the namespace has not been configured.
        """
        if not self._namespace:
            # shouldn't happen if the client is initialized, but we need to
            # appease the mypy type checker
            raise RuntimeError("The Kubernetes namespace is not configured")
        return self._namespace

    def create_deployment(
        self,
        deployment: SeldonDeployment,
        poll_timeout: int = 0,
    ) -> SeldonDeployment:
        """Create a Seldon Core deployment resource.

        Args:
            deployment: the Seldon Core deployment resource to create
            poll_timeout: the maximum time to wait for the deployment to become
                available or to fail. If set to 0, the function will return
                immediately without checking the deployment status. If a timeout
                occurs and the deployment is still pending creation, it will
                be returned anyway and no exception will be raised.

        Returns:
            the created Seldon Core deployment resource with updated status.

        Raises:
            SeldonDeploymentExistsError: if a deployment with the same name
                already exists.
            SeldonClientError: if an unknown error occurs during the creation of
                the deployment.
        """
        try:
            logger.debug(f"Creating SeldonDeployment resource: {deployment}")

            # mark the deployment as managed by ZenML, to differentiate
            # between deployments that are created by ZenML and those that
            # are not
            deployment.mark_as_managed_by_zenml()

            body_deploy = deployment.dict(exclude_none=True)
            response = (
                self._custom_objects_api.create_namespaced_custom_object(
                    group="machinelearning.seldon.io",
                    version="v1",
                    namespace=self._namespace,
                    plural="seldondeployments",
                    body=body_deploy,
                    _request_timeout=poll_timeout or None,
                )
            )
            logger.debug("Seldon Core API response: %s", response)
        except k8s_client.rest.ApiException as e:
            logger.error(
                "Exception when creating SeldonDeployment resource: %s", str(e)
            )
            if e.status == 409:
                raise SeldonDeploymentExistsError(
                    f"A deployment with the name {deployment.name} "
                    f"already exists in namespace {self._namespace}"
                )
            raise SeldonClientError(
                "Exception when creating SeldonDeployment resource"
            ) from e

        created_deployment = self.get_deployment(name=deployment.name)

        while poll_timeout > 0 and created_deployment.is_pending():
            time.sleep(5)
            poll_timeout -= 5
            created_deployment = self.get_deployment(name=deployment.name)

        return created_deployment

    def delete_deployment(
        self,
        name: str,
        force: bool = False,
        poll_timeout: int = 0,
    ) -> None:
        """Delete a Seldon Core deployment resource managed by ZenML.

        Args:
            name: the name of the Seldon Core deployment resource to delete.
            force: if True, the deployment deletion will be forced (the graceful
                period will be set to zero).
            poll_timeout: the maximum time to wait for the deployment to be
                deleted. If set to 0, the function will return immediately
                without checking the deployment status. If a timeout
                occurs and the deployment still exists, this method will
                return and no exception will be raised.

        Raises:
            SeldonClientError: if an unknown error occurs during the deployment
                removal.
        """
        try:
            logger.debug(f"Deleting SeldonDeployment resource: {name}")

            # call `get_deployment` to check that the deployment exists
            # and is managed by ZenML. It will raise
            # a SeldonDeploymentNotFoundError otherwise
            self.get_deployment(name=name)

            response = (
                self._custom_objects_api.delete_namespaced_custom_object(
                    group="machinelearning.seldon.io",
                    version="v1",
                    namespace=self._namespace,
                    plural="seldondeployments",
                    name=name,
                    _request_timeout=poll_timeout or None,
                    grace_period_seconds=0 if force else None,
                )
            )
            logger.debug("Seldon Core API response: %s", response)
        except k8s_client.rest.ApiException as e:
            logger.error(
                "Exception when deleting SeldonDeployment resource %s: %s",
                name,
                str(e),
            )
            raise SeldonClientError(
                f"Exception when deleting SeldonDeployment resource {name}"
            ) from e

        while poll_timeout > 0:
            try:
                self.get_deployment(name=name)
            except SeldonDeploymentNotFoundError:
                return
            time.sleep(5)
            poll_timeout -= 5

    def update_deployment(
        self,
        deployment: SeldonDeployment,
        poll_timeout: int = 0,
    ) -> SeldonDeployment:
        """Update a Seldon Core deployment resource.

        Args:
            deployment: the Seldon Core deployment resource to update
            poll_timeout: the maximum time to wait for the deployment to become
                available or to fail. If set to 0, the function will return
                immediately without checking the deployment status. If a timeout
                occurs and the deployment is still pending creation, it will
                be returned anyway and no exception will be raised.

        Returns:
            the updated Seldon Core deployment resource with updated status.

        Raises:
            SeldonClientError: if an unknown error occurs while updating the
                deployment.
        """
        try:
            logger.debug(
                f"Updating SeldonDeployment resource: {deployment.name}"
            )

            # mark the deployment as managed by ZenML, to differentiate
            # between deployments that are created by ZenML and those that
            # are not
            deployment.mark_as_managed_by_zenml()

            # call `get_deployment` to check that the deployment exists
            # and is managed by ZenML. It will raise
            # a SeldonDeploymentNotFoundError otherwise
            self.get_deployment(name=deployment.name)

            response = self._custom_objects_api.patch_namespaced_custom_object(
                group="machinelearning.seldon.io",
                version="v1",
                namespace=self._namespace,
                plural="seldondeployments",
                name=deployment.name,
                body=deployment.dict(exclude_none=True),
                _request_timeout=poll_timeout or None,
            )
            logger.debug("Seldon Core API response: %s", response)
        except k8s_client.rest.ApiException as e:
            logger.error(
                "Exception when updating SeldonDeployment resource: %s", str(e)
            )
            raise SeldonClientError(
                "Exception when creating SeldonDeployment resource"
            ) from e

        updated_deployment = self.get_deployment(name=deployment.name)

        while poll_timeout > 0 and updated_deployment.is_pending():
            time.sleep(5)
            poll_timeout -= 5
            updated_deployment = self.get_deployment(name=deployment.name)

        return updated_deployment

    def get_deployment(self, name: str) -> SeldonDeployment:
        """Get a ZenML managed Seldon Core deployment resource by name.

        Args:
            name: the name of the Seldon Core deployment resource to fetch.

        Returns:
            The Seldon Core deployment resource.

        Raises:
            SeldonDeploymentNotFoundError: if the deployment resource cannot
                be found or is not managed by ZenML.
            SeldonClientError: if an unknown error occurs while fetching
                the deployment.
        """
        try:
            logger.debug(f"Retrieving SeldonDeployment resource: {name}")

            response = self._custom_objects_api.get_namespaced_custom_object(
                group="machinelearning.seldon.io",
                version="v1",
                namespace=self._namespace,
                plural="seldondeployments",
                name=name,
            )
            logger.debug("Seldon Core API response: %s", response)
            try:
                deployment = SeldonDeployment(**response)
            except ValidationError as e:
                logger.error(
                    "Invalid Seldon Core deployment resource: %s\n%s",
                    str(e),
                    str(response),
                )
                raise SeldonDeploymentNotFoundError(
                    f"SeldonDeployment resource {name} could not be parsed"
                )

            # Only Seldon deployments managed by ZenML are returned
            if not deployment.is_managed_by_zenml():
                raise SeldonDeploymentNotFoundError(
                    f"Seldon Deployment {name} is not managed by ZenML"
                )
            return deployment

        except k8s_client.rest.ApiException as e:
            if e.status == 404:
                raise SeldonDeploymentNotFoundError(
                    f"SeldonDeployment resource not found: {name}"
                ) from e
            logger.error(
                "Exception when fetching SeldonDeployment resource %s: %s",
                name,
                str(e),
            )
            raise SeldonClientError(
                f"Unexpected exception when fetching SeldonDeployment "
                f"resource: {name}"
            ) from e

    def find_deployments(
        self,
        name: Optional[str] = None,
        labels: Optional[Dict[str, str]] = None,
        fields: Optional[Dict[str, str]] = None,
    ) -> List[SeldonDeployment]:
        """Find all ZenML-managed Seldon Core deployment resources matching the given criteria.

        Args:
            name: optional name of the deployment resource to find.
            fields: optional selector to restrict the list of returned
                Seldon deployments by their fields. Defaults to everything.
            labels: optional selector to restrict the list of returned
                Seldon deployments by their labels. Defaults to everything.

        Returns:
            List of Seldon Core deployments that match the given criteria.

        Raises:
            SeldonClientError: if an unknown error occurs while fetching
                the deployments.
        """
        fields = fields or {}
        labels = labels or {}
        # always filter results to only include Seldon deployments managed
        # by ZenML
        labels["app"] = "zenml"
        if name:
            fields = {"metadata.name": name}
        field_selector = (
            ",".join(f"{k}={v}" for k, v in fields.items()) if fields else None
        )
        label_selector = (
            ",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
        )
        try:

            logger.debug(
                f"Searching SeldonDeployment resources with label selector "
                f"'{labels or ''}' and field selector '{fields or ''}'"
            )
            response = self._custom_objects_api.list_namespaced_custom_object(
                group="machinelearning.seldon.io",
                version="v1",
                namespace=self._namespace,
                plural="seldondeployments",
                field_selector=field_selector,
                label_selector=label_selector,
            )
            logger.debug(
                "Seldon Core API returned %s items", len(response["items"])
            )
            deployments = []
            for item in response.get("items") or []:
                try:
                    deployments.append(SeldonDeployment(**item))
                except ValidationError as e:
                    logger.error(
                        "Invalid Seldon Core deployment resource: %s\n%s",
                        str(e),
                        str(item),
                    )
            return deployments
        except k8s_client.rest.ApiException as e:
            logger.error(
                "Exception when searching SeldonDeployment resources with "
                "label selector '%s' and field selector '%s': %s",
                label_selector or "",
                field_selector or "",
            )
            raise SeldonClientError(
                f"Unexpected exception when searching SeldonDeployment "
                f"with labels '{labels or ''}' and field '{fields or ''}'"
            ) from e

    def get_deployment_logs(
        self,
        name: str,
        follow: bool = False,
        tail: Optional[int] = None,
    ) -> Generator[str, bool, None]:
        """Get the logs of a Seldon Core deployment resource.

        Args:
            name: the name of the Seldon Core deployment to get logs for.
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that can be accessed to get the service logs.

        Yields:
            The next log line.

        Raises:
            SeldonClientError: if an unknown error occurs while fetching
                the logs.
        """
        logger.debug(f"Retrieving logs for SeldonDeployment resource: {name}")
        try:
            response = self._core_api.list_namespaced_pod(
                namespace=self._namespace,
                label_selector=f"seldon-deployment-id={name}",
            )
            logger.debug("Kubernetes API response: %s", response)
            pods = response.items
            if not pods:
                raise SeldonClientError(
                    f"The Seldon Core deployment {name} is not currently "
                    f"running: no Kubernetes pods associated with it were found"
                )
            pod = pods[0]
            pod_name = pod.metadata.name

            containers = [c.name for c in pod.spec.containers]
            init_containers = [c.name for c in pod.spec.init_containers]
            container_statuses = {
                c.name: c.started or c.restart_count
                for c in pod.status.container_statuses
            }

            container = "default"
            if container not in containers:
                container = containers[0]
            # some containers might not be running yet and have no logs to show,
            # so we need to filter them out
            if not container_statuses[container]:
                container = init_containers[0]

            logger.info(
                f"Retrieving logs for pod: `{pod_name}` and container "
                f"`{container}` in namespace `{self._namespace}`"
            )
            response = self._core_api.read_namespaced_pod_log(
                name=pod_name,
                namespace=self._namespace,
                container=container,
                follow=follow,
                tail_lines=tail,
                _preload_content=False,
            )
        except k8s_client.rest.ApiException as e:
            logger.error(
                "Exception when fetching logs for SeldonDeployment resource "
                "%s: %s",
                name,
                str(e),
            )
            raise SeldonClientError(
                f"Unexpected exception when fetching logs for SeldonDeployment "
                f"resource: {name}"
            ) from e

        try:
            while True:
                line = response.readline().decode("utf-8").rstrip("\n")
                if not line:
                    return
                stop = yield line
                if stop:
                    return
        finally:
            response.release_conn()

    def create_or_update_secret(
        self,
        name: str,
        secret: BaseSecretSchema,
    ) -> None:
        """Create or update a Kubernetes Secret resource.

        Uses the information contained in a ZenML secret.

        Args:
            name: the name of the Secret resource to create.
            secret: a ZenML secret with key-values that should be
                stored in the Secret resource.

        Raises:
            SeldonClientError: if an unknown error occurs during the creation of
                the secret.
            k8s_client.rest.ApiException: unexpected error.
        """
        try:
            logger.debug(f"Creating Secret resource: {name}")

            secret_data = {
                k.upper(): base64.b64encode(str(v).encode("utf-8")).decode(
                    "ascii"
                )
                for k, v in secret.content.items()
                if v is not None
            }

            secret = k8s_client.V1Secret(
                metadata=k8s_client.V1ObjectMeta(
                    name=name,
                    labels={"app": "zenml"},
                ),
                type="Opaque",
                data=secret_data,
            )

            try:
                # check if the secret is already present
                self._core_api.read_namespaced_secret(
                    name=name,
                    namespace=self._namespace,
                )
                # if we got this far, the secret is already present, update it
                # in place
                response = self._core_api.replace_namespaced_secret(
                    name=name,
                    namespace=self._namespace,
                    body=secret,
                )
            except k8s_client.rest.ApiException as e:
                if e.status != 404:
                    # if an error other than 404 is raised here, treat it
                    # as an unexpected error
                    raise
                response = self._core_api.create_namespaced_secret(
                    namespace=self._namespace,
                    body=secret,
                )
            logger.debug("Kubernetes API response: %s", response)
        except k8s_client.rest.ApiException as e:
            logger.error("Exception when creating Secret resource: %s", str(e))
            raise SeldonClientError(
                "Exception when creating Secret resource"
            ) from e

    def delete_secret(
        self,
        name: str,
    ) -> None:
        """Delete a Kubernetes Secret resource managed by ZenML.

        Args:
            name: the name of the Kubernetes Secret resource to delete.

        Raises:
            SeldonClientError: if an unknown error occurs during the removal
                of the secret.
        """
        try:
            logger.debug(f"Deleting Secret resource: {name}")

            response = self._core_api.delete_namespaced_secret(
                name=name,
                namespace=self._namespace,
            )
            logger.debug("Kubernetes API response: %s", response)
        except k8s_client.rest.ApiException as e:
            if e.status == 404:
                # the secret is no longer present, nothing to do
                return
            logger.error(
                "Exception when deleting Secret resource %s: %s",
                name,
                str(e),
            )
            raise SeldonClientError(
                f"Exception when deleting Secret resource {name}"
            ) from e
namespace: str property readonly

Returns the Kubernetes namespace in use by the client.

Returns:

Type Description
str

The Kubernetes namespace in use by the client.

Exceptions:

Type Description
RuntimeError

if the namespace has not been configured.

__init__(self, context, namespace) special

Initialize a Seldon Core client.

Parameters:

Name Type Description Default
context Optional[str]

the Kubernetes context to use.

required
namespace Optional[str]

the Kubernetes namespace to use.

required
Source code in zenml/integrations/seldon/seldon_client.py
def __init__(self, context: Optional[str], namespace: Optional[str]):
    """Initialize a Seldon Core client.

    Args:
        context: the Kubernetes context to use.
        namespace: the Kubernetes namespace to use.
    """
    self._context = context
    self._namespace = namespace
    self._initialize_k8s_clients()
create_deployment(self, deployment, poll_timeout=0)

Create a Seldon Core deployment resource.

Parameters:

Name Type Description Default
deployment SeldonDeployment

the Seldon Core deployment resource to create

required
poll_timeout int

the maximum time to wait for the deployment to become available or to fail. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment is still pending creation, it will be returned anyway and no exception will be raised.

0

Returns:

Type Description
SeldonDeployment

the created Seldon Core deployment resource with updated status.

Exceptions:

Type Description
SeldonDeploymentExistsError

if a deployment with the same name already exists.

SeldonClientError

if an unknown error occurs during the creation of the deployment.

Source code in zenml/integrations/seldon/seldon_client.py
def create_deployment(
    self,
    deployment: SeldonDeployment,
    poll_timeout: int = 0,
) -> SeldonDeployment:
    """Create a Seldon Core deployment resource.

    Args:
        deployment: the Seldon Core deployment resource to create
        poll_timeout: the maximum time to wait for the deployment to become
            available or to fail. If set to 0, the function will return
            immediately without checking the deployment status. If a timeout
            occurs and the deployment is still pending creation, it will
            be returned anyway and no exception will be raised.

    Returns:
        the created Seldon Core deployment resource with updated status.

    Raises:
        SeldonDeploymentExistsError: if a deployment with the same name
            already exists.
        SeldonClientError: if an unknown error occurs during the creation of
            the deployment.
    """
    try:
        logger.debug(f"Creating SeldonDeployment resource: {deployment}")

        # mark the deployment as managed by ZenML, to differentiate
        # between deployments that are created by ZenML and those that
        # are not
        deployment.mark_as_managed_by_zenml()

        body_deploy = deployment.dict(exclude_none=True)
        response = (
            self._custom_objects_api.create_namespaced_custom_object(
                group="machinelearning.seldon.io",
                version="v1",
                namespace=self._namespace,
                plural="seldondeployments",
                body=body_deploy,
                _request_timeout=poll_timeout or None,
            )
        )
        logger.debug("Seldon Core API response: %s", response)
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when creating SeldonDeployment resource: %s", str(e)
        )
        if e.status == 409:
            raise SeldonDeploymentExistsError(
                f"A deployment with the name {deployment.name} "
                f"already exists in namespace {self._namespace}"
            )
        raise SeldonClientError(
            "Exception when creating SeldonDeployment resource"
        ) from e

    created_deployment = self.get_deployment(name=deployment.name)

    while poll_timeout > 0 and created_deployment.is_pending():
        time.sleep(5)
        poll_timeout -= 5
        created_deployment = self.get_deployment(name=deployment.name)

    return created_deployment
create_or_update_secret(self, name, secret)

Create or update a Kubernetes Secret resource.

Uses the information contained in a ZenML secret.

Parameters:

Name Type Description Default
name str

the name of the Secret resource to create.

required
secret BaseSecretSchema

a ZenML secret with key-values that should be stored in the Secret resource.

required

Exceptions:

Type Description
SeldonClientError

if an unknown error occurs during the creation of the secret.

k8s_client.rest.ApiException

unexpected error.

Source code in zenml/integrations/seldon/seldon_client.py
def create_or_update_secret(
    self,
    name: str,
    secret: BaseSecretSchema,
) -> None:
    """Create or update a Kubernetes Secret resource.

    Uses the information contained in a ZenML secret.

    Args:
        name: the name of the Secret resource to create.
        secret: a ZenML secret with key-values that should be
            stored in the Secret resource.

    Raises:
        SeldonClientError: if an unknown error occurs during the creation of
            the secret.
        k8s_client.rest.ApiException: unexpected error.
    """
    try:
        logger.debug(f"Creating Secret resource: {name}")

        secret_data = {
            k.upper(): base64.b64encode(str(v).encode("utf-8")).decode(
                "ascii"
            )
            for k, v in secret.content.items()
            if v is not None
        }

        secret = k8s_client.V1Secret(
            metadata=k8s_client.V1ObjectMeta(
                name=name,
                labels={"app": "zenml"},
            ),
            type="Opaque",
            data=secret_data,
        )

        try:
            # check if the secret is already present
            self._core_api.read_namespaced_secret(
                name=name,
                namespace=self._namespace,
            )
            # if we got this far, the secret is already present, update it
            # in place
            response = self._core_api.replace_namespaced_secret(
                name=name,
                namespace=self._namespace,
                body=secret,
            )
        except k8s_client.rest.ApiException as e:
            if e.status != 404:
                # if an error other than 404 is raised here, treat it
                # as an unexpected error
                raise
            response = self._core_api.create_namespaced_secret(
                namespace=self._namespace,
                body=secret,
            )
        logger.debug("Kubernetes API response: %s", response)
    except k8s_client.rest.ApiException as e:
        logger.error("Exception when creating Secret resource: %s", str(e))
        raise SeldonClientError(
            "Exception when creating Secret resource"
        ) from e
delete_deployment(self, name, force=False, poll_timeout=0)

Delete a Seldon Core deployment resource managed by ZenML.

Parameters:

Name Type Description Default
name str

the name of the Seldon Core deployment resource to delete.

required
force bool

if True, the deployment deletion will be forced (the graceful period will be set to zero).

False
poll_timeout int

the maximum time to wait for the deployment to be deleted. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment still exists, this method will return and no exception will be raised.

0

Exceptions:

Type Description
SeldonClientError

if an unknown error occurs during the deployment removal.

Source code in zenml/integrations/seldon/seldon_client.py
def delete_deployment(
    self,
    name: str,
    force: bool = False,
    poll_timeout: int = 0,
) -> None:
    """Delete a Seldon Core deployment resource managed by ZenML.

    Args:
        name: the name of the Seldon Core deployment resource to delete.
        force: if True, the deployment deletion will be forced (the graceful
            period will be set to zero).
        poll_timeout: the maximum time to wait for the deployment to be
            deleted. If set to 0, the function will return immediately
            without checking the deployment status. If a timeout
            occurs and the deployment still exists, this method will
            return and no exception will be raised.

    Raises:
        SeldonClientError: if an unknown error occurs during the deployment
            removal.
    """
    try:
        logger.debug(f"Deleting SeldonDeployment resource: {name}")

        # call `get_deployment` to check that the deployment exists
        # and is managed by ZenML. It will raise
        # a SeldonDeploymentNotFoundError otherwise
        self.get_deployment(name=name)

        response = (
            self._custom_objects_api.delete_namespaced_custom_object(
                group="machinelearning.seldon.io",
                version="v1",
                namespace=self._namespace,
                plural="seldondeployments",
                name=name,
                _request_timeout=poll_timeout or None,
                grace_period_seconds=0 if force else None,
            )
        )
        logger.debug("Seldon Core API response: %s", response)
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when deleting SeldonDeployment resource %s: %s",
            name,
            str(e),
        )
        raise SeldonClientError(
            f"Exception when deleting SeldonDeployment resource {name}"
        ) from e

    while poll_timeout > 0:
        try:
            self.get_deployment(name=name)
        except SeldonDeploymentNotFoundError:
            return
        time.sleep(5)
        poll_timeout -= 5
delete_secret(self, name)

Delete a Kubernetes Secret resource managed by ZenML.

Parameters:

Name Type Description Default
name str

the name of the Kubernetes Secret resource to delete.

required

Exceptions:

Type Description
SeldonClientError

if an unknown error occurs during the removal of the secret.

Source code in zenml/integrations/seldon/seldon_client.py
def delete_secret(
    self,
    name: str,
) -> None:
    """Delete a Kubernetes Secret resource managed by ZenML.

    Args:
        name: the name of the Kubernetes Secret resource to delete.

    Raises:
        SeldonClientError: if an unknown error occurs during the removal
            of the secret.
    """
    try:
        logger.debug(f"Deleting Secret resource: {name}")

        response = self._core_api.delete_namespaced_secret(
            name=name,
            namespace=self._namespace,
        )
        logger.debug("Kubernetes API response: %s", response)
    except k8s_client.rest.ApiException as e:
        if e.status == 404:
            # the secret is no longer present, nothing to do
            return
        logger.error(
            "Exception when deleting Secret resource %s: %s",
            name,
            str(e),
        )
        raise SeldonClientError(
            f"Exception when deleting Secret resource {name}"
        ) from e
find_deployments(self, name=None, labels=None, fields=None)

Find all ZenML-managed Seldon Core deployment resources matching the given criteria.

Parameters:

Name Type Description Default
name Optional[str]

optional name of the deployment resource to find.

None
fields Optional[Dict[str, str]]

optional selector to restrict the list of returned Seldon deployments by their fields. Defaults to everything.

None
labels Optional[Dict[str, str]]

optional selector to restrict the list of returned Seldon deployments by their labels. Defaults to everything.

None

Returns:

Type Description
List[zenml.integrations.seldon.seldon_client.SeldonDeployment]

List of Seldon Core deployments that match the given criteria.

Exceptions:

Type Description
SeldonClientError

if an unknown error occurs while fetching the deployments.

Source code in zenml/integrations/seldon/seldon_client.py
def find_deployments(
    self,
    name: Optional[str] = None,
    labels: Optional[Dict[str, str]] = None,
    fields: Optional[Dict[str, str]] = None,
) -> List[SeldonDeployment]:
    """Find all ZenML-managed Seldon Core deployment resources matching the given criteria.

    Args:
        name: optional name of the deployment resource to find.
        fields: optional selector to restrict the list of returned
            Seldon deployments by their fields. Defaults to everything.
        labels: optional selector to restrict the list of returned
            Seldon deployments by their labels. Defaults to everything.

    Returns:
        List of Seldon Core deployments that match the given criteria.

    Raises:
        SeldonClientError: if an unknown error occurs while fetching
            the deployments.
    """
    fields = fields or {}
    labels = labels or {}
    # always filter results to only include Seldon deployments managed
    # by ZenML
    labels["app"] = "zenml"
    if name:
        fields = {"metadata.name": name}
    field_selector = (
        ",".join(f"{k}={v}" for k, v in fields.items()) if fields else None
    )
    label_selector = (
        ",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
    )
    try:

        logger.debug(
            f"Searching SeldonDeployment resources with label selector "
            f"'{labels or ''}' and field selector '{fields or ''}'"
        )
        response = self._custom_objects_api.list_namespaced_custom_object(
            group="machinelearning.seldon.io",
            version="v1",
            namespace=self._namespace,
            plural="seldondeployments",
            field_selector=field_selector,
            label_selector=label_selector,
        )
        logger.debug(
            "Seldon Core API returned %s items", len(response["items"])
        )
        deployments = []
        for item in response.get("items") or []:
            try:
                deployments.append(SeldonDeployment(**item))
            except ValidationError as e:
                logger.error(
                    "Invalid Seldon Core deployment resource: %s\n%s",
                    str(e),
                    str(item),
                )
        return deployments
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when searching SeldonDeployment resources with "
            "label selector '%s' and field selector '%s': %s",
            label_selector or "",
            field_selector or "",
        )
        raise SeldonClientError(
            f"Unexpected exception when searching SeldonDeployment "
            f"with labels '{labels or ''}' and field '{fields or ''}'"
        ) from e
get_deployment(self, name)

Get a ZenML managed Seldon Core deployment resource by name.

Parameters:

Name Type Description Default
name str

the name of the Seldon Core deployment resource to fetch.

required

Returns:

Type Description
SeldonDeployment

The Seldon Core deployment resource.

Exceptions:

Type Description
SeldonDeploymentNotFoundError

if the deployment resource cannot be found or is not managed by ZenML.

SeldonClientError

if an unknown error occurs while fetching the deployment.

Source code in zenml/integrations/seldon/seldon_client.py
def get_deployment(self, name: str) -> SeldonDeployment:
    """Get a ZenML managed Seldon Core deployment resource by name.

    Args:
        name: the name of the Seldon Core deployment resource to fetch.

    Returns:
        The Seldon Core deployment resource.

    Raises:
        SeldonDeploymentNotFoundError: if the deployment resource cannot
            be found or is not managed by ZenML.
        SeldonClientError: if an unknown error occurs while fetching
            the deployment.
    """
    try:
        logger.debug(f"Retrieving SeldonDeployment resource: {name}")

        response = self._custom_objects_api.get_namespaced_custom_object(
            group="machinelearning.seldon.io",
            version="v1",
            namespace=self._namespace,
            plural="seldondeployments",
            name=name,
        )
        logger.debug("Seldon Core API response: %s", response)
        try:
            deployment = SeldonDeployment(**response)
        except ValidationError as e:
            logger.error(
                "Invalid Seldon Core deployment resource: %s\n%s",
                str(e),
                str(response),
            )
            raise SeldonDeploymentNotFoundError(
                f"SeldonDeployment resource {name} could not be parsed"
            )

        # Only Seldon deployments managed by ZenML are returned
        if not deployment.is_managed_by_zenml():
            raise SeldonDeploymentNotFoundError(
                f"Seldon Deployment {name} is not managed by ZenML"
            )
        return deployment

    except k8s_client.rest.ApiException as e:
        if e.status == 404:
            raise SeldonDeploymentNotFoundError(
                f"SeldonDeployment resource not found: {name}"
            ) from e
        logger.error(
            "Exception when fetching SeldonDeployment resource %s: %s",
            name,
            str(e),
        )
        raise SeldonClientError(
            f"Unexpected exception when fetching SeldonDeployment "
            f"resource: {name}"
        ) from e
get_deployment_logs(self, name, follow=False, tail=None)

Get the logs of a Seldon Core deployment resource.

Parameters:

Name Type Description Default
name str

the name of the Seldon Core deployment to get logs for.

required
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
Generator[str, bool, NoneType]

A generator that can be accessed to get the service logs.

Yields:

Type Description
Generator[str, bool, NoneType]

The next log line.

Exceptions:

Type Description
SeldonClientError

if an unknown error occurs while fetching the logs.

Source code in zenml/integrations/seldon/seldon_client.py
def get_deployment_logs(
    self,
    name: str,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[str, bool, None]:
    """Get the logs of a Seldon Core deployment resource.

    Args:
        name: the name of the Seldon Core deployment to get logs for.
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.

    Yields:
        The next log line.

    Raises:
        SeldonClientError: if an unknown error occurs while fetching
            the logs.
    """
    logger.debug(f"Retrieving logs for SeldonDeployment resource: {name}")
    try:
        response = self._core_api.list_namespaced_pod(
            namespace=self._namespace,
            label_selector=f"seldon-deployment-id={name}",
        )
        logger.debug("Kubernetes API response: %s", response)
        pods = response.items
        if not pods:
            raise SeldonClientError(
                f"The Seldon Core deployment {name} is not currently "
                f"running: no Kubernetes pods associated with it were found"
            )
        pod = pods[0]
        pod_name = pod.metadata.name

        containers = [c.name for c in pod.spec.containers]
        init_containers = [c.name for c in pod.spec.init_containers]
        container_statuses = {
            c.name: c.started or c.restart_count
            for c in pod.status.container_statuses
        }

        container = "default"
        if container not in containers:
            container = containers[0]
        # some containers might not be running yet and have no logs to show,
        # so we need to filter them out
        if not container_statuses[container]:
            container = init_containers[0]

        logger.info(
            f"Retrieving logs for pod: `{pod_name}` and container "
            f"`{container}` in namespace `{self._namespace}`"
        )
        response = self._core_api.read_namespaced_pod_log(
            name=pod_name,
            namespace=self._namespace,
            container=container,
            follow=follow,
            tail_lines=tail,
            _preload_content=False,
        )
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when fetching logs for SeldonDeployment resource "
            "%s: %s",
            name,
            str(e),
        )
        raise SeldonClientError(
            f"Unexpected exception when fetching logs for SeldonDeployment "
            f"resource: {name}"
        ) from e

    try:
        while True:
            line = response.readline().decode("utf-8").rstrip("\n")
            if not line:
                return
            stop = yield line
            if stop:
                return
    finally:
        response.release_conn()
sanitize_labels(labels) staticmethod

Update the label values to be valid Kubernetes labels.

See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

Parameters:

Name Type Description Default
labels Dict[str, str]

the labels to sanitize.

required
Source code in zenml/integrations/seldon/seldon_client.py
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
    """Update the label values to be valid Kubernetes labels.

    See:
    https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

    Args:
        labels: the labels to sanitize.
    """
    for key, value in labels.items():
        # Kubernetes labels must be alphanumeric, no longer than
        # 63 characters, and must begin and end with an alphanumeric
        # character ([a-z0-9A-Z])
        labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
            "-_."
        )
update_deployment(self, deployment, poll_timeout=0)

Update a Seldon Core deployment resource.

Parameters:

Name Type Description Default
deployment SeldonDeployment

the Seldon Core deployment resource to update

required
poll_timeout int

the maximum time to wait for the deployment to become available or to fail. If set to 0, the function will return immediately without checking the deployment status. If a timeout occurs and the deployment is still pending creation, it will be returned anyway and no exception will be raised.

0

Returns:

Type Description
SeldonDeployment

the updated Seldon Core deployment resource with updated status.

Exceptions:

Type Description
SeldonClientError

if an unknown error occurs while updating the deployment.

Source code in zenml/integrations/seldon/seldon_client.py
def update_deployment(
    self,
    deployment: SeldonDeployment,
    poll_timeout: int = 0,
) -> SeldonDeployment:
    """Update a Seldon Core deployment resource.

    Args:
        deployment: the Seldon Core deployment resource to update
        poll_timeout: the maximum time to wait for the deployment to become
            available or to fail. If set to 0, the function will return
            immediately without checking the deployment status. If a timeout
            occurs and the deployment is still pending creation, it will
            be returned anyway and no exception will be raised.

    Returns:
        the updated Seldon Core deployment resource with updated status.

    Raises:
        SeldonClientError: if an unknown error occurs while updating the
            deployment.
    """
    try:
        logger.debug(
            f"Updating SeldonDeployment resource: {deployment.name}"
        )

        # mark the deployment as managed by ZenML, to differentiate
        # between deployments that are created by ZenML and those that
        # are not
        deployment.mark_as_managed_by_zenml()

        # call `get_deployment` to check that the deployment exists
        # and is managed by ZenML. It will raise
        # a SeldonDeploymentNotFoundError otherwise
        self.get_deployment(name=deployment.name)

        response = self._custom_objects_api.patch_namespaced_custom_object(
            group="machinelearning.seldon.io",
            version="v1",
            namespace=self._namespace,
            plural="seldondeployments",
            name=deployment.name,
            body=deployment.dict(exclude_none=True),
            _request_timeout=poll_timeout or None,
        )
        logger.debug("Seldon Core API response: %s", response)
    except k8s_client.rest.ApiException as e:
        logger.error(
            "Exception when updating SeldonDeployment resource: %s", str(e)
        )
        raise SeldonClientError(
            "Exception when creating SeldonDeployment resource"
        ) from e

    updated_deployment = self.get_deployment(name=deployment.name)

    while poll_timeout > 0 and updated_deployment.is_pending():
        time.sleep(5)
        poll_timeout -= 5
        updated_deployment = self.get_deployment(name=deployment.name)

    return updated_deployment

SeldonClientError (Exception)

Base exception class for all exceptions raised by the SeldonClient.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonClientError(Exception):
    """Base exception class for all exceptions raised by the SeldonClient."""

SeldonClientTimeout (SeldonClientError)

Raised when the Seldon client timed out while waiting for a resource to reach the expected status.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonClientTimeout(SeldonClientError):
    """Raised when the Seldon client timed out while waiting for a resource to reach the expected status."""

SeldonDeployment (BaseModel) pydantic-model

A Seldon Core deployment CRD.

This is a Pydantic representation of some of the fields in the Seldon Core CRD (documented here: https://docs.seldon.io/projects/seldon-core/en/latest/reference/seldon-deployment.html).

Note that not all fields are represented, only those that are relevant to the ZenML integration. The fields that are not represented are silently ignored when the Seldon Deployment is created or updated from an external SeldonDeployment CRD representation.

Attributes:

Name Type Description
kind str

Kubernetes kind field.

apiVersion str

Kubernetes apiVersion field.

metadata SeldonDeploymentMetadata

Kubernetes metadata field.

spec SeldonDeploymentSpec

Seldon Deployment spec entry.

status Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentStatus]

Seldon Deployment status.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeployment(BaseModel):
    """A Seldon Core deployment CRD.

    This is a Pydantic representation of some of the fields in the Seldon Core
    CRD (documented here:
    https://docs.seldon.io/projects/seldon-core/en/latest/reference/seldon-deployment.html).

    Note that not all fields are represented, only those that are relevant to
    the ZenML integration. The fields that are not represented are silently
    ignored when the Seldon Deployment is created or updated from an external
    SeldonDeployment CRD representation.

    Attributes:
        kind: Kubernetes kind field.
        apiVersion: Kubernetes apiVersion field.
        metadata: Kubernetes metadata field.
        spec: Seldon Deployment spec entry.
        status: Seldon Deployment status.
    """

    kind: str = Field(SELDON_DEPLOYMENT_KIND, const=True)
    apiVersion: str = Field(SELDON_DEPLOYMENT_API_VERSION, const=True)
    metadata: SeldonDeploymentMetadata = Field(
        default_factory=SeldonDeploymentMetadata
    )
    spec: SeldonDeploymentSpec = Field(default_factory=SeldonDeploymentSpec)
    status: Optional[SeldonDeploymentStatus]

    def __str__(self) -> str:
        """Returns a string representation of the Seldon Deployment.

        Returns:
            A string representation of the Seldon Deployment.
        """
        return json.dumps(self.dict(exclude_none=True), indent=4)

    @classmethod
    def build(
        cls,
        name: Optional[str] = None,
        model_uri: Optional[str] = None,
        model_name: Optional[str] = None,
        implementation: Optional[str] = None,
        parameters: Optional[List[SeldonDeploymentPredictorParameter]] = None,
        engineResources: Optional[SeldonResourceRequirements] = None,
        secret_name: Optional[str] = None,
        labels: Optional[Dict[str, str]] = None,
        annotations: Optional[Dict[str, str]] = None,
        is_custom_deployment: Optional[bool] = False,
        spec: Optional[Dict[Any, Any]] = None,
    ) -> "SeldonDeployment":
        """Build a basic Seldon Deployment object.

        Args:
            name: The name of the Seldon Deployment. If not explicitly passed,
                a unique name is autogenerated.
            model_uri: The URI of the model.
            model_name: The name of the model.
            implementation: The implementation of the model.
            parameters: The predictor graph parameters.
            engineResources: The resources to be allocated to the model.
            secret_name: The name of the Kubernetes secret containing
                environment variable values (e.g. with credentials for the
                artifact store) to use with the deployment service.
            labels: A dictionary of labels to apply to the Seldon Deployment.
            annotations: A dictionary of annotations to apply to the Seldon
                Deployment.
            spec: A Kubernetes pod spec to use for the Seldon Deployment.
            is_custom_deployment: Whether the Seldon Deployment is a custom or a built-in one.

        Returns:
            A minimal SeldonDeployment object built from the provided
            parameters.
        """
        if not name:
            name = f"zenml-{time.time()}"

        if labels is None:
            labels = {}
        if annotations is None:
            annotations = {}

        if is_custom_deployment:
            predictors = [
                SeldonDeploymentPredictor(
                    name=model_name or "",
                    graph=SeldonDeploymentPredictiveUnit(
                        name="classifier",
                        type=SeldonDeploymentPredictiveUnitType.MODEL,
                        parameters=parameters,
                    ),
                    engineResources=engineResources,
                    componentSpecs=[
                        SeldonDeploymentComponentSpecs(
                            spec=spec
                            # TODO [HIGH]: Add support for other component types (e.g. graph)
                        )
                    ],
                )
            ]
        else:
            predictors = [
                SeldonDeploymentPredictor(
                    name=model_name or "",
                    graph=SeldonDeploymentPredictiveUnit(
                        name="classifier",
                        type=SeldonDeploymentPredictiveUnitType.MODEL,
                        modelUri=model_uri or "",
                        implementation=implementation or "",
                        envSecretRefName=secret_name,
                        parameters=parameters,
                    ),
                    engineResources=engineResources,
                )
            ]

        return SeldonDeployment(
            metadata=SeldonDeploymentMetadata(
                name=name, labels=labels, annotations=annotations
            ),
            spec=SeldonDeploymentSpec(name=name, predictors=predictors),
        )

    def is_managed_by_zenml(self) -> bool:
        """Checks if this Seldon Deployment is managed by ZenML.

        The convention used to differentiate between SeldonDeployment instances
        that are managed by ZenML and those that are not is to set the `app`
        label value to `zenml`.

        Returns:
            True if the Seldon Deployment is managed by ZenML, False
            otherwise.
        """
        return self.metadata.labels.get("app") == "zenml"

    def mark_as_managed_by_zenml(self) -> None:
        """Marks this Seldon Deployment as managed by ZenML.

        The convention used to differentiate between SeldonDeployment instances
        that are managed by ZenML and those that are not is to set the `app`
        label value to `zenml`.
        """
        self.metadata.labels["app"] = "zenml"

    @property
    def name(self) -> str:
        """Returns the name of this Seldon Deployment.

        This is just a shortcut for `self.metadata.name`.

        Returns:
            The name of this Seldon Deployment.
        """
        return self.metadata.name

    @property
    def state(self) -> SeldonDeploymentStatusState:
        """The state of the Seldon Deployment.

        Returns:
            The state of the Seldon Deployment.
        """
        if not self.status:
            return SeldonDeploymentStatusState.UNKNOWN
        return self.status.state

    def is_pending(self) -> bool:
        """Checks if the Seldon Deployment is in a pending state.

        Returns:
            True if the Seldon Deployment is pending, False otherwise.
        """
        return self.state == SeldonDeploymentStatusState.CREATING

    def is_available(self) -> bool:
        """Checks if the Seldon Deployment is in an available state.

        Returns:
            True if the Seldon Deployment is available, False otherwise.
        """
        return self.state == SeldonDeploymentStatusState.AVAILABLE

    def is_failed(self) -> bool:
        """Checks if the Seldon Deployment is in a failed state.

        Returns:
            True if the Seldon Deployment is failed, False otherwise.
        """
        return self.state == SeldonDeploymentStatusState.FAILED

    def get_error(self) -> Optional[str]:
        """Get a message describing the error, if in an error state.

        Returns:
            A message describing the error, if in an error state, otherwise
            None.
        """
        if self.status and self.is_failed():
            return self.status.description
        return None

    def get_pending_message(self) -> Optional[str]:
        """Get a message describing the pending conditions of the Seldon Deployment.

        Returns:
            A message describing the pending condition of the Seldon
            Deployment, or None, if no conditions are pending.
        """
        if not self.status or not self.status.conditions:
            return None
        ready_condition_message = [
            c.message
            for c in self.status.conditions
            if c.type == "Ready" and not c.status
        ]
        if not ready_condition_message:
            return None
        return ready_condition_message[0]

    class Config:
        """Pydantic configuration class."""

        # validate attribute assignments
        validate_assignment = True
        # Ignore extra attributes from the CRD that are not reflected here
        extra = "ignore"
name: str property readonly

Returns the name of this Seldon Deployment.

This is just a shortcut for self.metadata.name.

Returns:

Type Description
str

The name of this Seldon Deployment.

state: SeldonDeploymentStatusState property readonly

The state of the Seldon Deployment.

Returns:

Type Description
SeldonDeploymentStatusState

The state of the Seldon Deployment.

Config

Pydantic configuration class.

Source code in zenml/integrations/seldon/seldon_client.py
class Config:
    """Pydantic configuration class."""

    # validate attribute assignments
    validate_assignment = True
    # Ignore extra attributes from the CRD that are not reflected here
    extra = "ignore"
__str__(self) special

Returns a string representation of the Seldon Deployment.

Returns:

Type Description
str

A string representation of the Seldon Deployment.

Source code in zenml/integrations/seldon/seldon_client.py
def __str__(self) -> str:
    """Returns a string representation of the Seldon Deployment.

    Returns:
        A string representation of the Seldon Deployment.
    """
    return json.dumps(self.dict(exclude_none=True), indent=4)
build(name=None, model_uri=None, model_name=None, implementation=None, parameters=None, engineResources=None, secret_name=None, labels=None, annotations=None, is_custom_deployment=False, spec=None) classmethod

Build a basic Seldon Deployment object.

Parameters:

Name Type Description Default
name Optional[str]

The name of the Seldon Deployment. If not explicitly passed, a unique name is autogenerated.

None
model_uri Optional[str]

The URI of the model.

None
model_name Optional[str]

The name of the model.

None
implementation Optional[str]

The implementation of the model.

None
parameters Optional[List[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictorParameter]]

The predictor graph parameters.

None
engineResources Optional[zenml.integrations.seldon.seldon_client.SeldonResourceRequirements]

The resources to be allocated to the model.

None
secret_name Optional[str]

The name of the Kubernetes secret containing environment variable values (e.g. with credentials for the artifact store) to use with the deployment service.

None
labels Optional[Dict[str, str]]

A dictionary of labels to apply to the Seldon Deployment.

None
annotations Optional[Dict[str, str]]

A dictionary of annotations to apply to the Seldon Deployment.

None
spec Optional[Dict[Any, Any]]

A Kubernetes pod spec to use for the Seldon Deployment.

None
is_custom_deployment Optional[bool]

Whether the Seldon Deployment is a custom or a built-in one.

False

Returns:

Type Description
SeldonDeployment

A minimal SeldonDeployment object built from the provided parameters.

Source code in zenml/integrations/seldon/seldon_client.py
@classmethod
def build(
    cls,
    name: Optional[str] = None,
    model_uri: Optional[str] = None,
    model_name: Optional[str] = None,
    implementation: Optional[str] = None,
    parameters: Optional[List[SeldonDeploymentPredictorParameter]] = None,
    engineResources: Optional[SeldonResourceRequirements] = None,
    secret_name: Optional[str] = None,
    labels: Optional[Dict[str, str]] = None,
    annotations: Optional[Dict[str, str]] = None,
    is_custom_deployment: Optional[bool] = False,
    spec: Optional[Dict[Any, Any]] = None,
) -> "SeldonDeployment":
    """Build a basic Seldon Deployment object.

    Args:
        name: The name of the Seldon Deployment. If not explicitly passed,
            a unique name is autogenerated.
        model_uri: The URI of the model.
        model_name: The name of the model.
        implementation: The implementation of the model.
        parameters: The predictor graph parameters.
        engineResources: The resources to be allocated to the model.
        secret_name: The name of the Kubernetes secret containing
            environment variable values (e.g. with credentials for the
            artifact store) to use with the deployment service.
        labels: A dictionary of labels to apply to the Seldon Deployment.
        annotations: A dictionary of annotations to apply to the Seldon
            Deployment.
        spec: A Kubernetes pod spec to use for the Seldon Deployment.
        is_custom_deployment: Whether the Seldon Deployment is a custom or a built-in one.

    Returns:
        A minimal SeldonDeployment object built from the provided
        parameters.
    """
    if not name:
        name = f"zenml-{time.time()}"

    if labels is None:
        labels = {}
    if annotations is None:
        annotations = {}

    if is_custom_deployment:
        predictors = [
            SeldonDeploymentPredictor(
                name=model_name or "",
                graph=SeldonDeploymentPredictiveUnit(
                    name="classifier",
                    type=SeldonDeploymentPredictiveUnitType.MODEL,
                    parameters=parameters,
                ),
                engineResources=engineResources,
                componentSpecs=[
                    SeldonDeploymentComponentSpecs(
                        spec=spec
                        # TODO [HIGH]: Add support for other component types (e.g. graph)
                    )
                ],
            )
        ]
    else:
        predictors = [
            SeldonDeploymentPredictor(
                name=model_name or "",
                graph=SeldonDeploymentPredictiveUnit(
                    name="classifier",
                    type=SeldonDeploymentPredictiveUnitType.MODEL,
                    modelUri=model_uri or "",
                    implementation=implementation or "",
                    envSecretRefName=secret_name,
                    parameters=parameters,
                ),
                engineResources=engineResources,
            )
        ]

    return SeldonDeployment(
        metadata=SeldonDeploymentMetadata(
            name=name, labels=labels, annotations=annotations
        ),
        spec=SeldonDeploymentSpec(name=name, predictors=predictors),
    )
get_error(self)

Get a message describing the error, if in an error state.

Returns:

Type Description
Optional[str]

A message describing the error, if in an error state, otherwise None.

Source code in zenml/integrations/seldon/seldon_client.py
def get_error(self) -> Optional[str]:
    """Get a message describing the error, if in an error state.

    Returns:
        A message describing the error, if in an error state, otherwise
        None.
    """
    if self.status and self.is_failed():
        return self.status.description
    return None
get_pending_message(self)

Get a message describing the pending conditions of the Seldon Deployment.

Returns:

Type Description
Optional[str]

A message describing the pending condition of the Seldon Deployment, or None, if no conditions are pending.

Source code in zenml/integrations/seldon/seldon_client.py
def get_pending_message(self) -> Optional[str]:
    """Get a message describing the pending conditions of the Seldon Deployment.

    Returns:
        A message describing the pending condition of the Seldon
        Deployment, or None, if no conditions are pending.
    """
    if not self.status or not self.status.conditions:
        return None
    ready_condition_message = [
        c.message
        for c in self.status.conditions
        if c.type == "Ready" and not c.status
    ]
    if not ready_condition_message:
        return None
    return ready_condition_message[0]
is_available(self)

Checks if the Seldon Deployment is in an available state.

Returns:

Type Description
bool

True if the Seldon Deployment is available, False otherwise.

Source code in zenml/integrations/seldon/seldon_client.py
def is_available(self) -> bool:
    """Checks if the Seldon Deployment is in an available state.

    Returns:
        True if the Seldon Deployment is available, False otherwise.
    """
    return self.state == SeldonDeploymentStatusState.AVAILABLE
is_failed(self)

Checks if the Seldon Deployment is in a failed state.

Returns:

Type Description
bool

True if the Seldon Deployment is failed, False otherwise.

Source code in zenml/integrations/seldon/seldon_client.py
def is_failed(self) -> bool:
    """Checks if the Seldon Deployment is in a failed state.

    Returns:
        True if the Seldon Deployment is failed, False otherwise.
    """
    return self.state == SeldonDeploymentStatusState.FAILED
is_managed_by_zenml(self)

Checks if this Seldon Deployment is managed by ZenML.

The convention used to differentiate between SeldonDeployment instances that are managed by ZenML and those that are not is to set the app label value to zenml.

Returns:

Type Description
bool

True if the Seldon Deployment is managed by ZenML, False otherwise.

Source code in zenml/integrations/seldon/seldon_client.py
def is_managed_by_zenml(self) -> bool:
    """Checks if this Seldon Deployment is managed by ZenML.

    The convention used to differentiate between SeldonDeployment instances
    that are managed by ZenML and those that are not is to set the `app`
    label value to `zenml`.

    Returns:
        True if the Seldon Deployment is managed by ZenML, False
        otherwise.
    """
    return self.metadata.labels.get("app") == "zenml"
is_pending(self)

Checks if the Seldon Deployment is in a pending state.

Returns:

Type Description
bool

True if the Seldon Deployment is pending, False otherwise.

Source code in zenml/integrations/seldon/seldon_client.py
def is_pending(self) -> bool:
    """Checks if the Seldon Deployment is in a pending state.

    Returns:
        True if the Seldon Deployment is pending, False otherwise.
    """
    return self.state == SeldonDeploymentStatusState.CREATING
mark_as_managed_by_zenml(self)

Marks this Seldon Deployment as managed by ZenML.

The convention used to differentiate between SeldonDeployment instances that are managed by ZenML and those that are not is to set the app label value to zenml.

Source code in zenml/integrations/seldon/seldon_client.py
def mark_as_managed_by_zenml(self) -> None:
    """Marks this Seldon Deployment as managed by ZenML.

    The convention used to differentiate between SeldonDeployment instances
    that are managed by ZenML and those that are not is to set the `app`
    label value to `zenml`.
    """
    self.metadata.labels["app"] = "zenml"

SeldonDeploymentComponentSpecs (BaseModel) pydantic-model

Component specs for a Seldon Deployment.

Attributes:

Name Type Description
spec Optional[Dict[str, Any]]

the component spec.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentComponentSpecs(BaseModel):
    """Component specs for a Seldon Deployment.

    Attributes:
        spec: the component spec.
    """

    spec: Optional[Dict[str, Any]]
    # TODO [HIGH]: Add graph field to ComponentSpecs. graph: Optional[SeldonDeploymentPredictiveUnit]

    class Config:
        """Pydantic configuration class."""

        # validate attribute assignments
        validate_assignment = True
        # Ignore extra attributes from the CRD that are not reflected here
        extra = "ignore"
Config

Pydantic configuration class.

Source code in zenml/integrations/seldon/seldon_client.py
class Config:
    """Pydantic configuration class."""

    # validate attribute assignments
    validate_assignment = True
    # Ignore extra attributes from the CRD that are not reflected here
    extra = "ignore"

SeldonDeploymentExistsError (SeldonClientError)

Raised when a SeldonDeployment resource cannot be created because a resource with the same name already exists.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentExistsError(SeldonClientError):
    """Raised when a SeldonDeployment resource cannot be created because a resource with the same name already exists."""

SeldonDeploymentMetadata (BaseModel) pydantic-model

Metadata for a Seldon Deployment.

Attributes:

Name Type Description
name str

the name of the Seldon Deployment.

labels Dict[str, str]

Kubernetes labels for the Seldon Deployment.

annotations Dict[str, str]

Kubernetes annotations for the Seldon Deployment.

creationTimestamp Optional[str]

the creation timestamp of the Seldon Deployment.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentMetadata(BaseModel):
    """Metadata for a Seldon Deployment.

    Attributes:
        name: the name of the Seldon Deployment.
        labels: Kubernetes labels for the Seldon Deployment.
        annotations: Kubernetes annotations for the Seldon Deployment.
        creationTimestamp: the creation timestamp of the Seldon Deployment.
    """

    name: str
    labels: Dict[str, str] = Field(default_factory=dict)
    annotations: Dict[str, str] = Field(default_factory=dict)
    creationTimestamp: Optional[str]

    class Config:
        """Pydantic configuration class."""

        # validate attribute assignments
        validate_assignment = True
        # Ignore extra attributes from the CRD that are not reflected here
        extra = "ignore"
Config

Pydantic configuration class.

Source code in zenml/integrations/seldon/seldon_client.py
class Config:
    """Pydantic configuration class."""

    # validate attribute assignments
    validate_assignment = True
    # Ignore extra attributes from the CRD that are not reflected here
    extra = "ignore"

SeldonDeploymentNotFoundError (SeldonClientError)

Raised when a particular SeldonDeployment resource is not found or is not managed by ZenML.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentNotFoundError(SeldonClientError):
    """Raised when a particular SeldonDeployment resource is not found or is not managed by ZenML."""

SeldonDeploymentPredictiveUnit (BaseModel) pydantic-model

Seldon Deployment predictive unit.

Attributes:

Name Type Description
name str

the name of the predictive unit.

type Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictiveUnitType]

predictive unit type.

implementation Optional[str]

the Seldon Core implementation used to serve the model.

modelUri Optional[str]

URI of the model (or models) to serve.

serviceAccountName Optional[str]

the name of the service account to associate with the predictive unit container.

envSecretRefName Optional[str]

the name of a Kubernetes secret that contains environment variables (e.g. credentials) to be configured for the predictive unit container.

children List[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictiveUnit]

a list of child predictive units that together make up the model serving graph.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictiveUnit(BaseModel):
    """Seldon Deployment predictive unit.

    Attributes:
        name: the name of the predictive unit.
        type: predictive unit type.
        implementation: the Seldon Core implementation used to serve the model.
        modelUri: URI of the model (or models) to serve.
        serviceAccountName: the name of the service account to associate with
            the predictive unit container.
        envSecretRefName: the name of a Kubernetes secret that contains
            environment variables (e.g. credentials) to be configured for the
            predictive unit container.
        children: a list of child predictive units that together make up the
            model serving graph.
    """

    name: str
    type: Optional[
        SeldonDeploymentPredictiveUnitType
    ] = SeldonDeploymentPredictiveUnitType.MODEL
    implementation: Optional[str]
    modelUri: Optional[str]
    parameters: Optional[List[SeldonDeploymentPredictorParameter]]
    serviceAccountName: Optional[str]
    envSecretRefName: Optional[str]
    children: List["SeldonDeploymentPredictiveUnit"] = Field(
        default_factory=list
    )

    class Config:
        """Pydantic configuration class."""

        # validate attribute assignments
        validate_assignment = True
        # Ignore extra attributes from the CRD that are not reflected here
        extra = "ignore"
Config

Pydantic configuration class.

Source code in zenml/integrations/seldon/seldon_client.py
class Config:
    """Pydantic configuration class."""

    # validate attribute assignments
    validate_assignment = True
    # Ignore extra attributes from the CRD that are not reflected here
    extra = "ignore"

SeldonDeploymentPredictiveUnitType (StrEnum)

Predictive unit types for a Seldon Deployment.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictiveUnitType(StrEnum):
    """Predictive unit types for a Seldon Deployment."""

    UNKNOWN_TYPE = "UNKNOWN_TYPE"
    ROUTER = "ROUTER"
    COMBINER = "COMBINER"
    MODEL = "MODEL"
    TRANSFORMER = "TRANSFORMER"
    OUTPUT_TRANSFORMER = "OUTPUT_TRANSFORMER"

SeldonDeploymentPredictor (BaseModel) pydantic-model

Seldon Deployment predictor.

Attributes:

Name Type Description
name str

the name of the predictor.

replicas int

the number of pod replicas for the predictor.

graph Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictiveUnit]

the serving graph composed of one or more predictive units.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictor(BaseModel):
    """Seldon Deployment predictor.

    Attributes:
        name: the name of the predictor.
        replicas: the number of pod replicas for the predictor.
        graph: the serving graph composed of one or more predictive units.
    """

    name: str
    replicas: int = 1
    graph: Optional[SeldonDeploymentPredictiveUnit] = Field(
        default_factory=SeldonDeploymentPredictiveUnit
    )
    engineResources: Optional[SeldonResourceRequirements] = Field(
        default_factory=SeldonResourceRequirements
    )
    componentSpecs: Optional[List[SeldonDeploymentComponentSpecs]]

    class Config:
        """Pydantic configuration class."""

        # validate attribute assignments
        validate_assignment = True
        # Ignore extra attributes from the CRD that are not reflected here
        extra = "ignore"
Config

Pydantic configuration class.

Source code in zenml/integrations/seldon/seldon_client.py
class Config:
    """Pydantic configuration class."""

    # validate attribute assignments
    validate_assignment = True
    # Ignore extra attributes from the CRD that are not reflected here
    extra = "ignore"

SeldonDeploymentPredictorParameter (BaseModel) pydantic-model

Parameter for Seldon Deployment predictor.

Attributes:

Name Type Description
name str

parameter name

type str

parameter, can be INT, FLOAT, DOUBLE, STRING, BOOL

value str

parameter value

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentPredictorParameter(BaseModel):
    """Parameter for Seldon Deployment predictor.

    Attributes:
        name: parameter name
        type: parameter, can be INT, FLOAT, DOUBLE, STRING, BOOL
        value: parameter value
    """

    name: str = ""
    type: str = ""
    value: str = ""

    class Config:
        """Pydantic configuration class."""

        # validate attribute assignments
        validate_assignment = True
        # Ignore extra attributes from the CRD that are not reflected here
        extra = "ignore"
Config

Pydantic configuration class.

Source code in zenml/integrations/seldon/seldon_client.py
class Config:
    """Pydantic configuration class."""

    # validate attribute assignments
    validate_assignment = True
    # Ignore extra attributes from the CRD that are not reflected here
    extra = "ignore"

SeldonDeploymentSpec (BaseModel) pydantic-model

Spec for a Seldon Deployment.

Attributes:

Name Type Description
name str

the name of the Seldon Deployment.

protocol Optional[str]

the API protocol used for the Seldon Deployment.

predictors List[zenml.integrations.seldon.seldon_client.SeldonDeploymentPredictor]

a list of predictors that make up the serving graph.

replicas int

the default number of pod replicas used for the predictors.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentSpec(BaseModel):
    """Spec for a Seldon Deployment.

    Attributes:
        name: the name of the Seldon Deployment.
        protocol: the API protocol used for the Seldon Deployment.
        predictors: a list of predictors that make up the serving graph.
        replicas: the default number of pod replicas used for the predictors.
    """

    name: str
    protocol: Optional[str]
    predictors: List[SeldonDeploymentPredictor]
    replicas: int = 1

    class Config:
        """Pydantic configuration class."""

        # validate attribute assignments
        validate_assignment = True
        # Ignore extra attributes from the CRD that are not reflected here
        extra = "ignore"
Config

Pydantic configuration class.

Source code in zenml/integrations/seldon/seldon_client.py
class Config:
    """Pydantic configuration class."""

    # validate attribute assignments
    validate_assignment = True
    # Ignore extra attributes from the CRD that are not reflected here
    extra = "ignore"

SeldonDeploymentStatus (BaseModel) pydantic-model

The status of a Seldon Deployment.

Attributes:

Name Type Description
state SeldonDeploymentStatusState

the current state of the Seldon Deployment.

description Optional[str]

a human-readable description of the current state.

replicas Optional[int]

the current number of running pod replicas

address Optional[zenml.integrations.seldon.seldon_client.SeldonDeploymentStatusAddress]

the address where the Seldon Deployment API can be accessed.

conditions List[zenml.integrations.seldon.seldon_client.SeldonDeploymentStatusCondition]

the list of Kubernetes conditions for the Seldon Deployment.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatus(BaseModel):
    """The status of a Seldon Deployment.

    Attributes:
        state: the current state of the Seldon Deployment.
        description: a human-readable description of the current state.
        replicas: the current number of running pod replicas
        address: the address where the Seldon Deployment API can be accessed.
        conditions: the list of Kubernetes conditions for the Seldon Deployment.
    """

    state: SeldonDeploymentStatusState = SeldonDeploymentStatusState.UNKNOWN
    description: Optional[str]
    replicas: Optional[int]
    address: Optional[SeldonDeploymentStatusAddress]
    conditions: List[SeldonDeploymentStatusCondition]

    class Config:
        """Pydantic configuration class."""

        # validate attribute assignments
        validate_assignment = True
        # Ignore extra attributes from the CRD that are not reflected here
        extra = "ignore"
Config

Pydantic configuration class.

Source code in zenml/integrations/seldon/seldon_client.py
class Config:
    """Pydantic configuration class."""

    # validate attribute assignments
    validate_assignment = True
    # Ignore extra attributes from the CRD that are not reflected here
    extra = "ignore"

SeldonDeploymentStatusAddress (BaseModel) pydantic-model

The status address for a Seldon Deployment.

Attributes:

Name Type Description
url str

the URL where the Seldon Deployment API can be accessed internally.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatusAddress(BaseModel):
    """The status address for a Seldon Deployment.

    Attributes:
        url: the URL where the Seldon Deployment API can be accessed internally.
    """

    url: str

SeldonDeploymentStatusCondition (BaseModel) pydantic-model

The Kubernetes status condition entry for a Seldon Deployment.

Attributes:

Name Type Description
type str

Type of runtime condition.

status bool

Status of the condition.

reason Optional[str]

Brief CamelCase string containing reason for the condition's last transition.

message Optional[str]

Human-readable message indicating details about last transition.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatusCondition(BaseModel):
    """The Kubernetes status condition entry for a Seldon Deployment.

    Attributes:
        type: Type of runtime condition.
        status: Status of the condition.
        reason: Brief CamelCase string containing reason for the condition's
            last transition.
        message: Human-readable message indicating details about last
            transition.
    """

    type: str
    status: bool
    reason: Optional[str]
    message: Optional[str]

SeldonDeploymentStatusState (StrEnum)

Possible state values for a Seldon Deployment.

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonDeploymentStatusState(StrEnum):
    """Possible state values for a Seldon Deployment."""

    UNKNOWN = "Unknown"
    AVAILABLE = "Available"
    CREATING = "Creating"
    FAILED = "Failed"

SeldonResourceRequirements (BaseModel) pydantic-model

Resource requirements for a Seldon deployed model.

Attributes:

Name Type Description
limits Dict[str, str]

an upper limit of resources to be used by the model

requests Dict[str, str]

resources requested by the model

Source code in zenml/integrations/seldon/seldon_client.py
class SeldonResourceRequirements(BaseModel):
    """Resource requirements for a Seldon deployed model.

    Attributes:
        limits: an upper limit of resources to be used by the model
        requests: resources requested by the model
    """

    limits: Dict[str, str] = Field(default_factory=dict)
    requests: Dict[str, str] = Field(default_factory=dict)

create_seldon_core_custom_spec(model_uri, custom_docker_image, secret_name, command, container_registry_secret_name=None)

Create a custom pod spec for the seldon core container.

Parameters:

Name Type Description Default
model_uri Optional[str]

The URI of the model to load.

required
custom_docker_image Optional[str]

The docker image to use.

required
secret_name Optional[str]

The name of the secret to use.

required
command Optional[List[str]]

The command to run in the container.

required
container_registry_secret_name Optional[str]

The name of the secret to use for docker image pull.

None

Returns:

Type Description
V1PodSpec

A pod spec for the seldon core container.

Source code in zenml/integrations/seldon/seldon_client.py
def create_seldon_core_custom_spec(
    model_uri: Optional[str],
    custom_docker_image: Optional[str],
    secret_name: Optional[str],
    command: Optional[List[str]],
    container_registry_secret_name: Optional[str] = None,
) -> k8s_client.V1PodSpec:
    """Create a custom pod spec for the seldon core container.

    Args:
        model_uri: The URI of the model to load.
        custom_docker_image: The docker image to use.
        secret_name: The name of the secret to use.
        command: The command to run in the container.
        container_registry_secret_name: The name of the secret to use for docker image pull.

    Returns:
        A pod spec for the seldon core container.
    """
    volume = k8s_client.V1Volume(
        name="classifier-provision-location",
        empty_dir={},
    )
    init_container = k8s_client.V1Container(
        name="classifier-model-initializer",
        image="seldonio/rclone-storage-initializer:1.14.0-dev",
        image_pull_policy="IfNotPresent",
        args=[model_uri, "/mnt/models"],
        volume_mounts=[
            k8s_client.V1VolumeMount(
                name="classifier-provision-location", mount_path="/mnt/models"
            )
        ],
        env_from=[
            k8s_client.V1EnvFromSource(
                secret_ref=k8s_client.V1SecretEnvSource(
                    name=secret_name, optional=False
                )
            )
        ],
    )
    image_pull_secret = k8s_client.V1LocalObjectReference(
        name=container_registry_secret_name
    )
    container = k8s_client.V1Container(
        name="classifier",
        image=custom_docker_image,
        image_pull_policy="IfNotPresent",
        command=command,
        volume_mounts=[
            k8s_client.V1VolumeMount(
                name="classifier-provision-location",
                mount_path="/mnt/models",
                read_only=True,
            )
        ],
        ports=[
            k8s_client.V1ContainerPort(container_port=5000),
            k8s_client.V1ContainerPort(container_port=9000),
        ],
    )

    if image_pull_secret:
        spec = k8s_client.V1PodSpec(
            volumes=[
                volume,
            ],
            init_containers=[
                init_container,
            ],
            image_pull_secrets=[image_pull_secret],
            containers=[container],
        )
    else:
        spec = k8s_client.V1PodSpec(
            volumes=[
                volume,
            ],
            init_containers=[
                init_container,
            ],
            containers=[container],
        )

    return api.sanitize_for_serialization(spec)

services special

Initialization for Seldon services.

seldon_deployment

Implementation for the Seldon Deployment step.

SeldonDeploymentConfig (ServiceConfig) pydantic-model

Seldon Core deployment service configuration.

Attributes:

Name Type Description
model_uri str

URI of the model (or models) to serve.

model_name str

the name of the model. Multiple versions of the same model should use the same model name.

implementation str

the Seldon Core implementation used to serve the model. The implementation type can be one of the following: TENSORFLOW_SERVER, SKLEARN_SERVER, XGBOOST_SERVER, custom.

replicas int

number of replicas to use for the prediction service.

secret_name Optional[str]

the name of a Kubernetes secret containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store).

model_metadata Dict[str, Any]

optional model metadata information (see https://docs.seldon.io/projects/seldon-core/en/latest/reference/apis/metadata.html).

extra_args Dict[str, Any]

additional arguments to pass to the Seldon Core deployment resource configuration.

is_custom_deployment Optional[bool]

whether the deployment is a custom deployment

spec Optional[Dict[Any, Any]]

custom Kubernetes resource specification for the Seldon Core

Source code in zenml/integrations/seldon/services/seldon_deployment.py
class SeldonDeploymentConfig(ServiceConfig):
    """Seldon Core deployment service configuration.

    Attributes:
        model_uri: URI of the model (or models) to serve.
        model_name: the name of the model. Multiple versions of the same model
            should use the same model name.
        implementation: the Seldon Core implementation used to serve the model.
            The implementation type can be one of the following: `TENSORFLOW_SERVER`,
            `SKLEARN_SERVER`, `XGBOOST_SERVER`, `custom`.
        replicas: number of replicas to use for the prediction service.
        secret_name: the name of a Kubernetes secret containing additional
            configuration parameters for the Seldon Core deployment (e.g.
            credentials to access the Artifact Store).
        model_metadata: optional model metadata information (see
            https://docs.seldon.io/projects/seldon-core/en/latest/reference/apis/metadata.html).
        extra_args: additional arguments to pass to the Seldon Core deployment
            resource configuration.
        is_custom_deployment: whether the deployment is a custom deployment
        spec: custom Kubernetes resource specification for the Seldon Core
    """

    model_uri: str = ""
    model_name: str = "default"
    # TODO [ENG-775]: have an enum of all supported Seldon Core implementations
    implementation: str
    parameters: Optional[List[SeldonDeploymentPredictorParameter]]
    resources: Optional[SeldonResourceRequirements]
    replicas: int = 1
    secret_name: Optional[str]
    model_metadata: Dict[str, Any] = Field(default_factory=dict)
    extra_args: Dict[str, Any] = Field(default_factory=dict)
    is_custom_deployment: Optional[bool] = False
    spec: Optional[Dict[Any, Any]] = Field(default_factory=dict)

    def get_seldon_deployment_labels(self) -> Dict[str, str]:
        """Generate labels for the Seldon Core deployment from the service configuration.

        These labels are attached to the Seldon Core deployment resource
        and may be used as label selectors in lookup operations.

        Returns:
            The labels for the Seldon Core deployment.
        """
        labels = {}
        if self.pipeline_name:
            labels["zenml.pipeline_name"] = self.pipeline_name
        if self.pipeline_run_id:
            labels["zenml.pipeline_run_id"] = self.pipeline_run_id
        if self.pipeline_step_name:
            labels["zenml.pipeline_step_name"] = self.pipeline_step_name
        if self.model_name:
            labels["zenml.model_name"] = self.model_name
        if self.model_uri:
            labels["zenml.model_uri"] = self.model_uri
        if self.implementation:
            labels["zenml.model_type"] = self.implementation
        if self.extra_args:
            for key, value in self.extra_args.items():
                labels[f"zenml.{key}"] = value
        SeldonClient.sanitize_labels(labels)
        return labels

    def get_seldon_deployment_annotations(self) -> Dict[str, str]:
        """Generate annotations for the Seldon Core deployment from the service configuration.

        The annotations are used to store additional information about the
        Seldon Core service that is associated with the deployment that is
        not available in the labels. One annotation particularly important
        is the serialized Service configuration itself, which is used to
        recreate the service configuration from a remote Seldon deployment.

        Returns:
            The annotations for the Seldon Core deployment.
        """
        annotations = {
            "zenml.service_config": self.json(),
            "zenml.version": __version__,
        }
        return annotations

    @classmethod
    def create_from_deployment(
        cls, deployment: SeldonDeployment
    ) -> "SeldonDeploymentConfig":
        """Recreate the configuration of a Seldon Core Service from a deployed instance.

        Args:
            deployment: the Seldon Core deployment resource.

        Returns:
            The Seldon Core service configuration corresponding to the given
            Seldon Core deployment resource.

        Raises:
            ValueError: if the given deployment resource does not contain
                the expected annotations or it contains an invalid or
                incompatible Seldon Core service configuration.
        """
        config_data = deployment.metadata.annotations.get(
            "zenml.service_config"
        )
        if not config_data:
            raise ValueError(
                f"The given deployment resource does not contain a "
                f"'zenml.service_config' annotation: {deployment}"
            )
        try:
            service_config = cls.parse_raw(config_data)
        except ValidationError as e:
            raise ValueError(
                f"The loaded Seldon Core deployment resource contains an "
                f"invalid or incompatible Seldon Core service configuration: "
                f"{config_data}"
            ) from e
        return service_config
create_from_deployment(deployment) classmethod

Recreate the configuration of a Seldon Core Service from a deployed instance.

Parameters:

Name Type Description Default
deployment SeldonDeployment

the Seldon Core deployment resource.

required

Returns:

Type Description
SeldonDeploymentConfig

The Seldon Core service configuration corresponding to the given Seldon Core deployment resource.

Exceptions:

Type Description
ValueError

if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible Seldon Core service configuration.

Source code in zenml/integrations/seldon/services/seldon_deployment.py
@classmethod
def create_from_deployment(
    cls, deployment: SeldonDeployment
) -> "SeldonDeploymentConfig":
    """Recreate the configuration of a Seldon Core Service from a deployed instance.

    Args:
        deployment: the Seldon Core deployment resource.

    Returns:
        The Seldon Core service configuration corresponding to the given
        Seldon Core deployment resource.

    Raises:
        ValueError: if the given deployment resource does not contain
            the expected annotations or it contains an invalid or
            incompatible Seldon Core service configuration.
    """
    config_data = deployment.metadata.annotations.get(
        "zenml.service_config"
    )
    if not config_data:
        raise ValueError(
            f"The given deployment resource does not contain a "
            f"'zenml.service_config' annotation: {deployment}"
        )
    try:
        service_config = cls.parse_raw(config_data)
    except ValidationError as e:
        raise ValueError(
            f"The loaded Seldon Core deployment resource contains an "
            f"invalid or incompatible Seldon Core service configuration: "
            f"{config_data}"
        ) from e
    return service_config
get_seldon_deployment_annotations(self)

Generate annotations for the Seldon Core deployment from the service configuration.

The annotations are used to store additional information about the Seldon Core service that is associated with the deployment that is not available in the labels. One annotation particularly important is the serialized Service configuration itself, which is used to recreate the service configuration from a remote Seldon deployment.

Returns:

Type Description
Dict[str, str]

The annotations for the Seldon Core deployment.

Source code in zenml/integrations/seldon/services/seldon_deployment.py
def get_seldon_deployment_annotations(self) -> Dict[str, str]:
    """Generate annotations for the Seldon Core deployment from the service configuration.

    The annotations are used to store additional information about the
    Seldon Core service that is associated with the deployment that is
    not available in the labels. One annotation particularly important
    is the serialized Service configuration itself, which is used to
    recreate the service configuration from a remote Seldon deployment.

    Returns:
        The annotations for the Seldon Core deployment.
    """
    annotations = {
        "zenml.service_config": self.json(),
        "zenml.version": __version__,
    }
    return annotations
get_seldon_deployment_labels(self)

Generate labels for the Seldon Core deployment from the service configuration.

These labels are attached to the Seldon Core deployment resource and may be used as label selectors in lookup operations.

Returns:

Type Description
Dict[str, str]

The labels for the Seldon Core deployment.

Source code in zenml/integrations/seldon/services/seldon_deployment.py
def get_seldon_deployment_labels(self) -> Dict[str, str]:
    """Generate labels for the Seldon Core deployment from the service configuration.

    These labels are attached to the Seldon Core deployment resource
    and may be used as label selectors in lookup operations.

    Returns:
        The labels for the Seldon Core deployment.
    """
    labels = {}
    if self.pipeline_name:
        labels["zenml.pipeline_name"] = self.pipeline_name
    if self.pipeline_run_id:
        labels["zenml.pipeline_run_id"] = self.pipeline_run_id
    if self.pipeline_step_name:
        labels["zenml.pipeline_step_name"] = self.pipeline_step_name
    if self.model_name:
        labels["zenml.model_name"] = self.model_name
    if self.model_uri:
        labels["zenml.model_uri"] = self.model_uri
    if self.implementation:
        labels["zenml.model_type"] = self.implementation
    if self.extra_args:
        for key, value in self.extra_args.items():
            labels[f"zenml.{key}"] = value
    SeldonClient.sanitize_labels(labels)
    return labels
SeldonDeploymentService (BaseDeploymentService) pydantic-model

A service that represents a Seldon Core deployment server.

Attributes:

Name Type Description
config SeldonDeploymentConfig

service configuration.

status SeldonDeploymentServiceStatus

service status.

Source code in zenml/integrations/seldon/services/seldon_deployment.py
class SeldonDeploymentService(BaseDeploymentService):
    """A service that represents a Seldon Core deployment server.

    Attributes:
        config: service configuration.
        status: service status.
    """

    SERVICE_TYPE = ServiceType(
        name="seldon-deployment",
        type="model-serving",
        flavor="seldon",
        description="Seldon Core prediction service",
    )

    config: SeldonDeploymentConfig = Field(
        default_factory=SeldonDeploymentConfig
    )
    status: SeldonDeploymentServiceStatus = Field(
        default_factory=SeldonDeploymentServiceStatus
    )

    def _get_client(self) -> SeldonClient:
        """Get the Seldon Core client from the active Seldon Core model deployer.

        Returns:
            The Seldon Core client.
        """
        from zenml.integrations.seldon.model_deployers.seldon_model_deployer import (
            SeldonModelDeployer,
        )

        model_deployer = cast(
            SeldonModelDeployer,
            SeldonModelDeployer.get_active_model_deployer(),
        )
        return model_deployer.seldon_client

    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the the current operational state of the Seldon Core deployment.

        Returns:
            The operational state of the Seldon Core deployment and a message
            providing additional information about that state (e.g. a
            description of the error, if one is encountered).
        """
        client = self._get_client()
        name = self.seldon_deployment_name
        try:
            deployment = client.get_deployment(name=name)
        except SeldonDeploymentNotFoundError:
            return (ServiceState.INACTIVE, "")

        if deployment.is_available():
            return (
                ServiceState.ACTIVE,
                f"Seldon Core deployment '{name}' is available",
            )

        if deployment.is_failed():
            return (
                ServiceState.ERROR,
                f"Seldon Core deployment '{name}' failed: "
                f"{deployment.get_error()}",
            )

        pending_message = deployment.get_pending_message() or ""
        return (
            ServiceState.PENDING_STARTUP,
            "Seldon Core deployment is being created: " + pending_message,
        )

    @property
    def seldon_deployment_name(self) -> str:
        """Get the name of the Seldon Core deployment.

        It should return the one that uniquely corresponds to this service instance.

        Returns:
            The name of the Seldon Core deployment.
        """
        return f"zenml-{str(self.uuid)}"

    def _get_seldon_deployment_labels(self) -> Dict[str, str]:
        """Generate the labels for the Seldon Core deployment from the service configuration.

        Returns:
            The labels for the Seldon Core deployment.
        """
        labels = self.config.get_seldon_deployment_labels()
        labels["zenml.service_uuid"] = str(self.uuid)
        SeldonClient.sanitize_labels(labels)
        return labels

    @classmethod
    def create_from_deployment(
        cls, deployment: SeldonDeployment
    ) -> "SeldonDeploymentService":
        """Recreate a Seldon Core service from a Seldon Core deployment resource.

        It should then update their operational status.

        Args:
            deployment: the Seldon Core deployment resource.

        Returns:
            The Seldon Core service corresponding to the given
            Seldon Core deployment resource.

        Raises:
            ValueError: if the given deployment resource does not contain
                the expected service_uuid label.
        """
        config = SeldonDeploymentConfig.create_from_deployment(deployment)
        uuid = deployment.metadata.labels.get("zenml.service_uuid")
        if not uuid:
            raise ValueError(
                f"The given deployment resource does not contain a valid "
                f"'zenml.service_uuid' label: {deployment}"
            )
        service = cls(uuid=UUID(uuid), config=config)
        service.update_status()
        return service

    def provision(self) -> None:
        """Provision or update remote Seldon Core deployment instance.

        This should then match the current configuration.
        """
        client = self._get_client()

        name = self.seldon_deployment_name

        deployment = SeldonDeployment.build(
            name=name,
            model_uri=self.config.model_uri,
            model_name=self.config.model_name,
            implementation=self.config.implementation,
            parameters=self.config.parameters,
            engineResources=self.config.resources,
            secret_name=self.config.secret_name,
            labels=self._get_seldon_deployment_labels(),
            annotations=self.config.get_seldon_deployment_annotations(),
            is_custom_deployment=self.config.is_custom_deployment,
            spec=self.config.spec,
        )
        deployment.spec.replicas = self.config.replicas
        deployment.spec.predictors[0].replicas = self.config.replicas

        # check if the Seldon deployment already exists
        try:
            client.get_deployment(name=name)
            # update the existing deployment
            client.update_deployment(deployment)
        except SeldonDeploymentNotFoundError:
            # create the deployment
            client.create_deployment(deployment=deployment)

    def deprovision(self, force: bool = False) -> None:
        """Deprovision the remote Seldon Core deployment instance.

        Args:
            force: if True, the remote deployment instance will be
                forcefully deprovisioned.
        """
        client = self._get_client()
        name = self.seldon_deployment_name
        try:
            client.delete_deployment(name=name, force=force)
        except SeldonDeploymentNotFoundError:
            pass

    def get_logs(
        self,
        follow: bool = False,
        tail: Optional[int] = None,
    ) -> Generator[str, bool, None]:
        """Get the logs of a Seldon Core model deployment.

        Args:
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that can be accessed to get the service logs.
        """
        return self._get_client().get_deployment_logs(
            self.seldon_deployment_name,
            follow=follow,
            tail=tail,
        )

    @property
    def prediction_url(self) -> Optional[str]:
        """The prediction URI exposed by the prediction service.

        Returns:
            The prediction URI exposed by the prediction service, or None if
            the service is not yet ready.
        """
        from zenml.integrations.seldon.model_deployers.seldon_model_deployer import (
            SeldonModelDeployer,
        )

        if not self.is_running:
            return None
        namespace = self._get_client().namespace
        model_deployer = cast(
            SeldonModelDeployer,
            SeldonModelDeployer.get_active_model_deployer(),
        )
        return os.path.join(
            model_deployer.config.base_url,
            "seldon",
            namespace,
            self.seldon_deployment_name,
            "api/v0.1/predictions",
        )

    def predict(self, request: str) -> Any:
        """Make a prediction using the service.

        Args:
            request: a numpy array representing the request

        Returns:
            A numpy array representing the prediction returned by the service.

        Raises:
            Exception: if the service is not yet ready.
            ValueError: if the prediction_url is not set.
        """
        if not self.is_running:
            raise Exception(
                "Seldon prediction service is not running. "
                "Please start the service before making predictions."
            )

        if self.prediction_url is None:
            raise ValueError("`self.prediction_url` is not set, cannot post.")

        if isinstance(request, str):
            request = json.loads(request)
        else:
            raise ValueError("Request must be a json string.")
        response = requests.post(
            self.prediction_url,
            json={"data": {"ndarray": request}},
        )
        response.raise_for_status()
        return response.json()
prediction_url: Optional[str] property readonly

The prediction URI exposed by the prediction service.

Returns:

Type Description
Optional[str]

The prediction URI exposed by the prediction service, or None if the service is not yet ready.

seldon_deployment_name: str property readonly

Get the name of the Seldon Core deployment.

It should return the one that uniquely corresponds to this service instance.

Returns:

Type Description
str

The name of the Seldon Core deployment.

check_status(self)

Check the the current operational state of the Seldon Core deployment.

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the Seldon Core deployment and a message providing additional information about that state (e.g. a description of the error, if one is encountered).

Source code in zenml/integrations/seldon/services/seldon_deployment.py
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the the current operational state of the Seldon Core deployment.

    Returns:
        The operational state of the Seldon Core deployment and a message
        providing additional information about that state (e.g. a
        description of the error, if one is encountered).
    """
    client = self._get_client()
    name = self.seldon_deployment_name
    try:
        deployment = client.get_deployment(name=name)
    except SeldonDeploymentNotFoundError:
        return (ServiceState.INACTIVE, "")

    if deployment.is_available():
        return (
            ServiceState.ACTIVE,
            f"Seldon Core deployment '{name}' is available",
        )

    if deployment.is_failed():
        return (
            ServiceState.ERROR,
            f"Seldon Core deployment '{name}' failed: "
            f"{deployment.get_error()}",
        )

    pending_message = deployment.get_pending_message() or ""
    return (
        ServiceState.PENDING_STARTUP,
        "Seldon Core deployment is being created: " + pending_message,
    )
create_from_deployment(deployment) classmethod

Recreate a Seldon Core service from a Seldon Core deployment resource.

It should then update their operational status.

Parameters:

Name Type Description Default
deployment SeldonDeployment

the Seldon Core deployment resource.

required

Returns:

Type Description
SeldonDeploymentService

The Seldon Core service corresponding to the given Seldon Core deployment resource.

Exceptions:

Type Description
ValueError

if the given deployment resource does not contain the expected service_uuid label.

Source code in zenml/integrations/seldon/services/seldon_deployment.py
@classmethod
def create_from_deployment(
    cls, deployment: SeldonDeployment
) -> "SeldonDeploymentService":
    """Recreate a Seldon Core service from a Seldon Core deployment resource.

    It should then update their operational status.

    Args:
        deployment: the Seldon Core deployment resource.

    Returns:
        The Seldon Core service corresponding to the given
        Seldon Core deployment resource.

    Raises:
        ValueError: if the given deployment resource does not contain
            the expected service_uuid label.
    """
    config = SeldonDeploymentConfig.create_from_deployment(deployment)
    uuid = deployment.metadata.labels.get("zenml.service_uuid")
    if not uuid:
        raise ValueError(
            f"The given deployment resource does not contain a valid "
            f"'zenml.service_uuid' label: {deployment}"
        )
    service = cls(uuid=UUID(uuid), config=config)
    service.update_status()
    return service
deprovision(self, force=False)

Deprovision the remote Seldon Core deployment instance.

Parameters:

Name Type Description Default
force bool

if True, the remote deployment instance will be forcefully deprovisioned.

False
Source code in zenml/integrations/seldon/services/seldon_deployment.py
def deprovision(self, force: bool = False) -> None:
    """Deprovision the remote Seldon Core deployment instance.

    Args:
        force: if True, the remote deployment instance will be
            forcefully deprovisioned.
    """
    client = self._get_client()
    name = self.seldon_deployment_name
    try:
        client.delete_deployment(name=name, force=force)
    except SeldonDeploymentNotFoundError:
        pass
get_logs(self, follow=False, tail=None)

Get the logs of a Seldon Core model deployment.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
Generator[str, bool, NoneType]

A generator that can be accessed to get the service logs.

Source code in zenml/integrations/seldon/services/seldon_deployment.py
def get_logs(
    self,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[str, bool, None]:
    """Get the logs of a Seldon Core model deployment.

    Args:
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.
    """
    return self._get_client().get_deployment_logs(
        self.seldon_deployment_name,
        follow=follow,
        tail=tail,
    )
predict(self, request)

Make a prediction using the service.

Parameters:

Name Type Description Default
request str

a numpy array representing the request

required

Returns:

Type Description
Any

A numpy array representing the prediction returned by the service.

Exceptions:

Type Description
Exception

if the service is not yet ready.

ValueError

if the prediction_url is not set.

Source code in zenml/integrations/seldon/services/seldon_deployment.py
def predict(self, request: str) -> Any:
    """Make a prediction using the service.

    Args:
        request: a numpy array representing the request

    Returns:
        A numpy array representing the prediction returned by the service.

    Raises:
        Exception: if the service is not yet ready.
        ValueError: if the prediction_url is not set.
    """
    if not self.is_running:
        raise Exception(
            "Seldon prediction service is not running. "
            "Please start the service before making predictions."
        )

    if self.prediction_url is None:
        raise ValueError("`self.prediction_url` is not set, cannot post.")

    if isinstance(request, str):
        request = json.loads(request)
    else:
        raise ValueError("Request must be a json string.")
    response = requests.post(
        self.prediction_url,
        json={"data": {"ndarray": request}},
    )
    response.raise_for_status()
    return response.json()
provision(self)

Provision or update remote Seldon Core deployment instance.

This should then match the current configuration.

Source code in zenml/integrations/seldon/services/seldon_deployment.py
def provision(self) -> None:
    """Provision or update remote Seldon Core deployment instance.

    This should then match the current configuration.
    """
    client = self._get_client()

    name = self.seldon_deployment_name

    deployment = SeldonDeployment.build(
        name=name,
        model_uri=self.config.model_uri,
        model_name=self.config.model_name,
        implementation=self.config.implementation,
        parameters=self.config.parameters,
        engineResources=self.config.resources,
        secret_name=self.config.secret_name,
        labels=self._get_seldon_deployment_labels(),
        annotations=self.config.get_seldon_deployment_annotations(),
        is_custom_deployment=self.config.is_custom_deployment,
        spec=self.config.spec,
    )
    deployment.spec.replicas = self.config.replicas
    deployment.spec.predictors[0].replicas = self.config.replicas

    # check if the Seldon deployment already exists
    try:
        client.get_deployment(name=name)
        # update the existing deployment
        client.update_deployment(deployment)
    except SeldonDeploymentNotFoundError:
        # create the deployment
        client.create_deployment(deployment=deployment)
SeldonDeploymentServiceStatus (ServiceStatus) pydantic-model

Seldon Core deployment service status.

Source code in zenml/integrations/seldon/services/seldon_deployment.py
class SeldonDeploymentServiceStatus(ServiceStatus):
    """Seldon Core deployment service status."""

steps special

Initialization for Seldon steps.

seldon_deployer

Implementation of the Seldon Deployer step.

CustomDeployParameters (BaseModel) pydantic-model

Custom model deployer step extra parameters.

Attributes:

Name Type Description
predict_function str

Path to Python file containing predict function.

Exceptions:

Type Description
ValueError

If predict_function is not specified.

TypeError

If predict_function is not a callable function.

Returns:

Type Description
predict_function

Path to Python file containing predict function.

Source code in zenml/integrations/seldon/steps/seldon_deployer.py
class CustomDeployParameters(BaseModel):
    """Custom model deployer step extra parameters.

    Attributes:
        predict_function: Path to Python file containing predict function.

    Raises:
        ValueError: If predict_function is not specified.
        TypeError: If predict_function is not a callable function.

    Returns:
        predict_function: Path to Python file containing predict function.
    """

    predict_function: str

    @validator("predict_function")
    def predict_function_validate(cls, predict_func_path: str) -> str:
        """Validate predict function.

        Args:
            predict_func_path: predict function path

        Returns:
            predict function path

        Raises:
            ValueError: if predict function path is not valid
            TypeError: if predict function path is not a callable function
        """
        try:
            predict_function = import_class_by_path(predict_func_path)
        except AttributeError:
            raise ValueError("Predict function can't be found.")
        if not callable(predict_function):
            raise TypeError("Predict function must be callable.")
        return predict_func_path
predict_function_validate(predict_func_path) classmethod

Validate predict function.

Parameters:

Name Type Description Default
predict_func_path str

predict function path

required

Returns:

Type Description
str

predict function path

Exceptions:

Type Description
ValueError

if predict function path is not valid

TypeError

if predict function path is not a callable function

Source code in zenml/integrations/seldon/steps/seldon_deployer.py
@validator("predict_function")
def predict_function_validate(cls, predict_func_path: str) -> str:
    """Validate predict function.

    Args:
        predict_func_path: predict function path

    Returns:
        predict function path

    Raises:
        ValueError: if predict function path is not valid
        TypeError: if predict function path is not a callable function
    """
    try:
        predict_function = import_class_by_path(predict_func_path)
    except AttributeError:
        raise ValueError("Predict function can't be found.")
    if not callable(predict_function):
        raise TypeError("Predict function must be callable.")
    return predict_func_path
SeldonDeployerStepParameters (BaseParameters) pydantic-model

Seldon model deployer step parameters.

Attributes:

Name Type Description
service_config SeldonDeploymentConfig

Seldon Core deployment service configuration.

secrets

a list of ZenML secrets containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store where the models are stored). If supplied, the information fetched from these secrets is passed to the Seldon Core deployment server as a list of environment variables.

custom_deploy_parameters Optional[zenml.integrations.seldon.steps.seldon_deployer.CustomDeployParameters]

custom deployment parameters

registry_model_name Optional[str]

name of the model in the model registry

registry_model_version Optional[str]

version of the model in the model registry

registry_model_stage Optional[zenml.model_registries.base_model_registry.ModelVersionStage]

stage of the model in the model registry

replace_existing bool

whether to replace an existing deployment of the model with the same name, this is used only when the model is deployed from a model registry stored model.

Source code in zenml/integrations/seldon/steps/seldon_deployer.py
class SeldonDeployerStepParameters(BaseParameters):
    """Seldon model deployer step parameters.

    Attributes:
        service_config: Seldon Core deployment service configuration.
        secrets: a list of ZenML secrets containing additional configuration
            parameters for the Seldon Core deployment (e.g. credentials to
            access the Artifact Store where the models are stored). If supplied,
            the information fetched from these secrets is passed to the Seldon
            Core deployment server as a list of environment variables.
        custom_deploy_parameters: custom deployment parameters
        registry_model_name: name of the model in the model registry
        registry_model_version: version of the model in the model registry
        registry_model_stage: stage of the model in the model registry
        replace_existing: whether to replace an existing deployment of the
            model with the same name, this is used only when the model is
            deployed from a model registry stored model.
    """

    service_config: SeldonDeploymentConfig
    custom_deploy_parameters: Optional[CustomDeployParameters] = None
    registry_model_name: Optional[str] = None
    registry_model_version: Optional[str] = None
    registry_model_stage: Optional[ModelVersionStage] = None
    replace_existing: bool = True
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
seldon_custom_model_deployer_step (BaseStep)

Seldon Core custom model deployer pipeline step.

This step can be used in a pipeline to implement the the process required to deploy a custom model with Seldon Core.

Parameters:

Name Type Description Default
deploy_decision

whether to deploy the model or not

required
params

parameters for the deployer step

required
model

the model artifact to deploy

required
context

the step context

required

Exceptions:

Type Description
ValueError

if the custom deployer is not defined

DoesNotExistException

if an entity does not exist raise an exception

Returns:

Type Description

Seldon Core deployment service

PARAMETERS_CLASS (BaseParameters) pydantic-model

Seldon model deployer step parameters.

Attributes:

Name Type Description
service_config SeldonDeploymentConfig

Seldon Core deployment service configuration.

secrets

a list of ZenML secrets containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store where the models are stored). If supplied, the information fetched from these secrets is passed to the Seldon Core deployment server as a list of environment variables.

custom_deploy_parameters Optional[zenml.integrations.seldon.steps.seldon_deployer.CustomDeployParameters]

custom deployment parameters

registry_model_name Optional[str]

name of the model in the model registry

registry_model_version Optional[str]

version of the model in the model registry

registry_model_stage Optional[zenml.model_registries.base_model_registry.ModelVersionStage]

stage of the model in the model registry

replace_existing bool

whether to replace an existing deployment of the model with the same name, this is used only when the model is deployed from a model registry stored model.

Source code in zenml/integrations/seldon/steps/seldon_deployer.py
class SeldonDeployerStepParameters(BaseParameters):
    """Seldon model deployer step parameters.

    Attributes:
        service_config: Seldon Core deployment service configuration.
        secrets: a list of ZenML secrets containing additional configuration
            parameters for the Seldon Core deployment (e.g. credentials to
            access the Artifact Store where the models are stored). If supplied,
            the information fetched from these secrets is passed to the Seldon
            Core deployment server as a list of environment variables.
        custom_deploy_parameters: custom deployment parameters
        registry_model_name: name of the model in the model registry
        registry_model_version: version of the model in the model registry
        registry_model_stage: stage of the model in the model registry
        replace_existing: whether to replace an existing deployment of the
            model with the same name, this is used only when the model is
            deployed from a model registry stored model.
    """

    service_config: SeldonDeploymentConfig
    custom_deploy_parameters: Optional[CustomDeployParameters] = None
    registry_model_name: Optional[str] = None
    registry_model_version: Optional[str] = None
    registry_model_stage: Optional[ModelVersionStage] = None
    replace_existing: bool = True
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
entrypoint(deploy_decision, params, context, model) staticmethod

Seldon Core custom model deployer pipeline step.

This step can be used in a pipeline to implement the the process required to deploy a custom model with Seldon Core.

Parameters:

Name Type Description Default
deploy_decision bool

whether to deploy the model or not

required
params SeldonDeployerStepParameters

parameters for the deployer step

required
model UnmaterializedArtifact

the model artifact to deploy

required
context StepContext

the step context

required

Exceptions:

Type Description
ValueError

if the custom deployer is not defined

DoesNotExistException

if an entity does not exist raise an exception

Returns:

Type Description
SeldonDeploymentService

Seldon Core deployment service

Source code in zenml/integrations/seldon/steps/seldon_deployer.py
@step(enable_cache=False, extra={SELDON_CUSTOM_DEPLOYMENT: True})
def seldon_custom_model_deployer_step(
    deploy_decision: bool,
    params: SeldonDeployerStepParameters,
    context: StepContext,
    model: UnmaterializedArtifact,
) -> SeldonDeploymentService:
    """Seldon Core custom model deployer pipeline step.

    This step can be used in a pipeline to implement the
    the process required to deploy a custom model with Seldon Core.

    Args:
        deploy_decision: whether to deploy the model or not
        params: parameters for the deployer step
        model: the model artifact to deploy
        context: the step context

    Raises:
        ValueError: if the custom deployer is not defined
        DoesNotExistException: if an entity does not exist raise an exception

    Returns:
        Seldon Core deployment service
    """
    # verify that a custom deployer is defined
    if not params.custom_deploy_parameters:
        raise ValueError(
            "Custom deploy parameter is required as part of the step configuration this parameter is",
            "the path of the custom predict function",
        )
    # get the active model deployer
    model_deployer = cast(
        SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name, run id
    step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
    pipeline_name = step_env.pipeline_name
    run_name = step_env.run_name
    step_name = step_env.step_name

    # update the step configuration with the real pipeline runtime information
    params.service_config.pipeline_name = pipeline_name
    params.service_config.pipeline_run_id = run_name
    params.service_config.pipeline_step_name = step_name
    params.service_config.is_custom_deployment = True

    # fetch existing services with the same pipeline name, step name and
    # model name
    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=step_name,
        model_name=params.service_config.model_name,
    )
    # even when the deploy decision is negative if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not"
            f" meet the criteria. Reusing the last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{params.service_config.model_name}'..."
        )
        service = cast(SeldonDeploymentService, existing_services[0])
        # even when the deployment decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if not service.is_running:
            service.start(timeout=params.timeout)
        return service

    # entrypoint for starting Seldon microservice deployment for custom model
    entrypoint_command = [
        "python",
        "-m",
        "zenml.integrations.seldon.custom_deployer.zenml_custom_model",
        "--model_name",
        params.service_config.model_name,
        "--predict_func",
        params.custom_deploy_parameters.predict_function,
    ]

    # verify if there is an active stack before starting the service
    if not context.stack:
        raise DoesNotExistException(
            "No active stack is available. "
            "Please make sure that you have registered and set a stack."
        )

    image_name = step_env.step_run_info.get_image(key=SELDON_DOCKER_IMAGE_KEY)

    # copy the model files to new specific directory for the deployment
    served_model_uri = os.path.join(
        context.get_output_artifact_uri(), "seldon"
    )
    fileio.makedirs(served_model_uri)
    io_utils.copy_dir(model.uri, served_model_uri)

    # save the model artifact metadata to the YAML file and copy it to the
    # deployment directory
    model_metadata_file = save_model_metadata(model)
    fileio.copy(
        model_metadata_file,
        os.path.join(served_model_uri, MODEL_METADATA_YAML_FILE_NAME),
    )

    # prepare the service configuration for the deployment
    service_config = params.service_config.copy()
    service_config.model_uri = served_model_uri

    # create the specification for the custom deployment
    service_config.spec = create_seldon_core_custom_spec(
        model_uri=service_config.model_uri,
        custom_docker_image=image_name,
        secret_name=model_deployer.kubernetes_secret_name,
        command=entrypoint_command,
    )

    # deploy the service
    service = cast(
        SeldonDeploymentService,
        model_deployer.deploy_model(
            service_config, replace=True, timeout=params.timeout
        ),
    )

    logger.info(
        f"Seldon Core deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
    )

    return service
seldon_mlflow_registry_deployer_step (BaseStep)

Seldon Core model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for a MLflow model with Seldon Core.

Parameters:

Name Type Description Default
params

parameters for the deployer step

required

Returns:

Type Description

Seldon Core deployment service

Exceptions:

Type Description
ValueError

if registry_model_name is not provided

ValueError

if neither registry_model_version nor registry_model_stage is provided

ValueError

if the MLflow experiment tracker is not available in the active stack

LookupError

if no model version is found in the MLflow model registry.

PARAMETERS_CLASS (BaseParameters) pydantic-model

Seldon model deployer step parameters.

Attributes:

Name Type Description
service_config SeldonDeploymentConfig

Seldon Core deployment service configuration.

secrets

a list of ZenML secrets containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store where the models are stored). If supplied, the information fetched from these secrets is passed to the Seldon Core deployment server as a list of environment variables.

custom_deploy_parameters Optional[zenml.integrations.seldon.steps.seldon_deployer.CustomDeployParameters]

custom deployment parameters

registry_model_name Optional[str]

name of the model in the model registry

registry_model_version Optional[str]

version of the model in the model registry

registry_model_stage Optional[zenml.model_registries.base_model_registry.ModelVersionStage]

stage of the model in the model registry

replace_existing bool

whether to replace an existing deployment of the model with the same name, this is used only when the model is deployed from a model registry stored model.

Source code in zenml/integrations/seldon/steps/seldon_deployer.py
class SeldonDeployerStepParameters(BaseParameters):
    """Seldon model deployer step parameters.

    Attributes:
        service_config: Seldon Core deployment service configuration.
        secrets: a list of ZenML secrets containing additional configuration
            parameters for the Seldon Core deployment (e.g. credentials to
            access the Artifact Store where the models are stored). If supplied,
            the information fetched from these secrets is passed to the Seldon
            Core deployment server as a list of environment variables.
        custom_deploy_parameters: custom deployment parameters
        registry_model_name: name of the model in the model registry
        registry_model_version: version of the model in the model registry
        registry_model_stage: stage of the model in the model registry
        replace_existing: whether to replace an existing deployment of the
            model with the same name, this is used only when the model is
            deployed from a model registry stored model.
    """

    service_config: SeldonDeploymentConfig
    custom_deploy_parameters: Optional[CustomDeployParameters] = None
    registry_model_name: Optional[str] = None
    registry_model_version: Optional[str] = None
    registry_model_stage: Optional[ModelVersionStage] = None
    replace_existing: bool = True
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
entrypoint(params) staticmethod

Seldon Core model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for a MLflow model with Seldon Core.

Parameters:

Name Type Description Default
params SeldonDeployerStepParameters

parameters for the deployer step

required

Returns:

Type Description
SeldonDeploymentService

Seldon Core deployment service

Exceptions:

Type Description
ValueError

if registry_model_name is not provided

ValueError

if neither registry_model_version nor registry_model_stage is provided

ValueError

if the MLflow experiment tracker is not available in the active stack

LookupError

if no model version is found in the MLflow model registry.

Source code in zenml/integrations/seldon/steps/seldon_deployer.py
@step(enable_cache=False)
def seldon_mlflow_registry_deployer_step(
    params: SeldonDeployerStepParameters,
) -> SeldonDeploymentService:
    """Seldon Core model deployer pipeline step.

    This step can be used in a pipeline to implement continuous
    deployment for a MLflow model with Seldon Core.

    Args:
        params: parameters for the deployer step

    Returns:
        Seldon Core deployment service

    Raises:
        ValueError: if registry_model_name is not provided
        ValueError: if neither registry_model_version nor
            registry_model_stage is provided
        ValueError: if the MLflow experiment tracker is not available in the
            active stack
        LookupError: if no model version is found in the MLflow model registry.
    """
    # import here to avoid failing the pipeline if the step is not used

    # check if the MLflow experiment tracker, MLflow model registry and
    # Seldon Core model deployer are available
    if not params.registry_model_name:
        raise ValueError(
            "registry_model_name must be provided to the MLflow"
            "model registry deployer step."
        )
    elif not params.registry_model_version and not params.registry_model_stage:
        raise ValueError(
            "Either registry_model_version or registry_model_stage must"
            "be provided in addition to registry_model_name to the MLflow"
            "model registry deployer step. Since the"
            "mlflow_model_registry_deployer_step is used in conjunction with"
            "the mlflow_model_registry."
        )
    if params.service_config.implementation != "MLFLOW_SERVER":
        raise ValueError(
            "This step only allows MLFLOW_SERVER implementation with seldon"
        )
    # Get the active model deployer
    model_deployer = cast(
        SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
    )

    # fetch the MLflow model registry
    model_registry = Client().active_stack.model_registry
    assert model_registry is not None

    # fetch the model version
    if params.registry_model_version:
        try:
            model_version = model_registry.get_model_version(
                name=params.registry_model_name,
                version=params.registry_model_version,
            )
        except KeyError:
            model_version = None
    elif params.registry_model_stage:
        model_version = model_registry.get_latest_model_version(
            name=params.registry_model_name,
            stage=params.registry_model_stage,
        )
    if not model_version:
        raise LookupError(
            f"No Model Version found for model name "
            f"{params.registry_model_name} and version "
            f"{params.registry_model_version} or stage "
            f"{params.registry_model_stage}"
        )
    if model_version.model_format != MLFLOW_MODEL_FORMAT:
        raise ValueError(
            f"Model version {model_version.version} of model "
            f"{model_version.registered_model.name} is not an MLflow model."
            f"Only MLflow models can be deployed with Seldon Core using "
            f"this step."
        )
    # Prepare the service configuration
    service_config = params.service_config
    service_config.extra_args["registry_model_name"] = (
        model_version.registered_model.name,
    )
    service_config.extra_args["registry_model_version"] = (
        model_version.version,
    )
    service_config.extra_args["registry_model_stage"] = (
        model_version.stage.value,
    )
    service_config.model_uri = model_registry.get_model_uri_artifact_store(
        model_version=model_version,
    )
    # fetch existing services with same pipeline name, step name and
    # model name
    existing_services = (
        model_deployer.find_model_server(
            model_name=model_version.registered_model.name,
        )
        if params.replace_existing
        else []
    )
    # even when the deploy decision is negative, if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if existing_services:
        service = cast(SeldonDeploymentService, existing_services[0])
        # We need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if service.config.model_uri == service_config.model_uri:
            if not service.is_running:
                service.start()
            return service
        else:
            # stop the existing service
            service.stop()

    # invoke the Seldon Core model deployer to create a new service
    # or update an existing one that was previously deployed for the same
    # model
    service = cast(
        SeldonDeploymentService,
        model_deployer.deploy_model(
            service_config, replace=True, timeout=params.timeout
        ),
    )

    logger.info(
        f"Seldon deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
    )

    return service
seldon_model_deployer_step (BaseStep)

Seldon Core model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for a ML model with Seldon Core.

Parameters:

Name Type Description Default
deploy_decision

whether to deploy the model or not

required
params

parameters for the deployer step

required
model

the model artifact to deploy

required
context

the step context

required

Returns:

Type Description

Seldon Core deployment service

PARAMETERS_CLASS (BaseParameters) pydantic-model

Seldon model deployer step parameters.

Attributes:

Name Type Description
service_config SeldonDeploymentConfig

Seldon Core deployment service configuration.

secrets

a list of ZenML secrets containing additional configuration parameters for the Seldon Core deployment (e.g. credentials to access the Artifact Store where the models are stored). If supplied, the information fetched from these secrets is passed to the Seldon Core deployment server as a list of environment variables.

custom_deploy_parameters Optional[zenml.integrations.seldon.steps.seldon_deployer.CustomDeployParameters]

custom deployment parameters

registry_model_name Optional[str]

name of the model in the model registry

registry_model_version Optional[str]

version of the model in the model registry

registry_model_stage Optional[zenml.model_registries.base_model_registry.ModelVersionStage]

stage of the model in the model registry

replace_existing bool

whether to replace an existing deployment of the model with the same name, this is used only when the model is deployed from a model registry stored model.

Source code in zenml/integrations/seldon/steps/seldon_deployer.py
class SeldonDeployerStepParameters(BaseParameters):
    """Seldon model deployer step parameters.

    Attributes:
        service_config: Seldon Core deployment service configuration.
        secrets: a list of ZenML secrets containing additional configuration
            parameters for the Seldon Core deployment (e.g. credentials to
            access the Artifact Store where the models are stored). If supplied,
            the information fetched from these secrets is passed to the Seldon
            Core deployment server as a list of environment variables.
        custom_deploy_parameters: custom deployment parameters
        registry_model_name: name of the model in the model registry
        registry_model_version: version of the model in the model registry
        registry_model_stage: stage of the model in the model registry
        replace_existing: whether to replace an existing deployment of the
            model with the same name, this is used only when the model is
            deployed from a model registry stored model.
    """

    service_config: SeldonDeploymentConfig
    custom_deploy_parameters: Optional[CustomDeployParameters] = None
    registry_model_name: Optional[str] = None
    registry_model_version: Optional[str] = None
    registry_model_stage: Optional[ModelVersionStage] = None
    replace_existing: bool = True
    timeout: int = DEFAULT_SELDON_DEPLOYMENT_START_STOP_TIMEOUT
entrypoint(deploy_decision, params, context, model) staticmethod

Seldon Core model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for a ML model with Seldon Core.

Parameters:

Name Type Description Default
deploy_decision bool

whether to deploy the model or not

required
params SeldonDeployerStepParameters

parameters for the deployer step

required
model UnmaterializedArtifact

the model artifact to deploy

required
context StepContext

the step context

required

Returns:

Type Description
SeldonDeploymentService

Seldon Core deployment service

Source code in zenml/integrations/seldon/steps/seldon_deployer.py
@step(enable_cache=False)
def seldon_model_deployer_step(
    deploy_decision: bool,
    params: SeldonDeployerStepParameters,
    context: StepContext,
    model: UnmaterializedArtifact,
) -> SeldonDeploymentService:
    """Seldon Core model deployer pipeline step.

    This step can be used in a pipeline to implement continuous
    deployment for a ML model with Seldon Core.

    Args:
        deploy_decision: whether to deploy the model or not
        params: parameters for the deployer step
        model: the model artifact to deploy
        context: the step context

    Returns:
        Seldon Core deployment service
    """
    model_deployer = cast(
        SeldonModelDeployer, SeldonModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name and run id
    step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
    pipeline_name = step_env.pipeline_name
    run_name = step_env.run_name
    step_name = step_env.step_name

    # update the step configuration with the real pipeline runtime information
    params.service_config.pipeline_name = pipeline_name
    params.service_config.pipeline_run_id = run_name
    params.service_config.pipeline_step_name = step_name

    def prepare_service_config(model_uri: str) -> SeldonDeploymentConfig:
        """Prepare the model files for model serving.

        This creates and returns a Seldon service configuration for the model.

        This function ensures that the model files are in the correct format
        and file structure required by the Seldon Core server implementation
        used for model serving.

        Args:
            model_uri: the URI of the model artifact being served

        Returns:
            The URL to the model ready for serving.

        Raises:
            RuntimeError: if the model files were not found
        """
        served_model_uri = os.path.join(
            context.get_output_artifact_uri(), "seldon"
        )
        fileio.makedirs(served_model_uri)

        # TODO [ENG-773]: determine how to formalize how models are organized into
        #   folders and sub-folders depending on the model type/format and the
        #   Seldon Core protocol used to serve the model.

        # TODO [ENG-791]: auto-detect built-in Seldon server implementation
        #   from the model artifact type

        # TODO [ENG-792]: validate the model artifact type against the
        #   supported built-in Seldon server implementations
        if params.service_config.implementation == "TENSORFLOW_SERVER":
            # the TensorFlow server expects model artifacts to be
            # stored in numbered subdirectories, each representing a model
            # version
            io_utils.copy_dir(model_uri, os.path.join(served_model_uri, "1"))
        elif params.service_config.implementation == "SKLEARN_SERVER":
            # the sklearn server expects model artifacts to be
            # stored in a file called model.joblib
            model_uri = os.path.join(model.uri, "model")
            if not fileio.exists(model.uri):
                raise RuntimeError(
                    f"Expected sklearn model artifact was not found at "
                    f"{model_uri}"
                )
            fileio.copy(
                model_uri, os.path.join(served_model_uri, "model.joblib")
            )
        else:
            # default treatment for all other server implementations is to
            # simply reuse the model from the artifact store path where it
            # is originally stored
            served_model_uri = model_uri

        service_config = params.service_config.copy()
        service_config.model_uri = served_model_uri
        return service_config

    # fetch existing services with same pipeline name, step name and
    # model name
    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=step_name,
        model_name=params.service_config.model_name,
    )

    # even when the deploy decision is negative, if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{params.service_config.model_name}'..."
        )
        service = cast(SeldonDeploymentService, existing_services[0])
        # even when the deploy decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if not service.is_running:
            service.start(timeout=params.timeout)
        return service

    # invoke the Seldon Core model deployer to create a new service
    # or update an existing one that was previously deployed for the same
    # model
    service_config = prepare_service_config(model.uri)
    service = cast(
        SeldonDeploymentService,
        model_deployer.deploy_model(
            service_config, replace=True, timeout=params.timeout
        ),
    )

    logger.info(
        f"Seldon deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
    )

    return service