Skip to content

Kserve

zenml.integrations.kserve special

Initialization of the KServe integration for ZenML.

The KServe integration allows you to use the KServe model serving platform to implement continuous model deployment.

KServeIntegration (Integration)

Definition of KServe integration for ZenML.

Source code in zenml/integrations/kserve/__init__.py
class KServeIntegration(Integration):
    """Definition of KServe integration for ZenML."""

    NAME = KSERVE
    REQUIREMENTS = [
        "kserve==0.9.0",
        "torch-model-archiver",
    ]

    @classmethod
    def activate(cls) -> None:
        """Activate the Seldon Core integration."""
        from zenml.integrations.kserve import model_deployers  # noqa
        from zenml.integrations.kserve import secret_schemas  # noqa
        from zenml.integrations.kserve import services  # noqa
        from zenml.integrations.kserve import steps  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for KServe.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.kserve.flavors import KServeModelDeployerFlavor

        return [KServeModelDeployerFlavor]

activate() classmethod

Activate the Seldon Core integration.

Source code in zenml/integrations/kserve/__init__.py
@classmethod
def activate(cls) -> None:
    """Activate the Seldon Core integration."""
    from zenml.integrations.kserve import model_deployers  # noqa
    from zenml.integrations.kserve import secret_schemas  # noqa
    from zenml.integrations.kserve import services  # noqa
    from zenml.integrations.kserve import steps  # noqa

flavors() classmethod

Declare the stack component flavors for KServe.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/kserve/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for KServe.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.kserve.flavors import KServeModelDeployerFlavor

    return [KServeModelDeployerFlavor]

constants

KServe constants.

custom_deployer special

Initialization of ZenML custom deployer.

zenml_custom_model

Implements a custom model for the Kserve integration.

ZenMLCustomModel (Model)

Custom model class for ZenML and Kserve.

This class is used to implement a custom model for the Kserve integration, which is used as the main entry point for custom code execution.

Attributes:

Name Type Description
model_name

The name of the model.

model_uri

The URI of the model.

predict_func

The predict function of the model.

Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
class ZenMLCustomModel(kserve.Model):  # type: ignore[misc]
    """Custom model class for ZenML and Kserve.

    This class is used to implement a custom model for the Kserve integration,
    which is used as the main entry point for custom code execution.

    Attributes:
        model_name: The name of the model.
        model_uri: The URI of the model.
        predict_func: The predict function of the model.
    """

    def __init__(
        self,
        model_name: str,
        model_uri: str,
        predict_func: str,
    ):
        """Initializes a ZenMLCustomModel object.

        Args:
            model_name: The name of the model.
            model_uri: The URI of the model.
            predict_func: The predict function of the model.
        """
        super().__init__(model_name)
        self.name = model_name
        self.model_uri = model_uri
        self.predict_func = import_class_by_path(predict_func)
        self.model = None
        self.ready = False

    def load(self) -> bool:
        """Load the model.

        This function loads the model into memory and sets the ready flag to True.

        The model is loaded using the materializer, by saving the information of
        the artifact to a YAML file in the same path as the model artifacts at
        the preparing time and loading it again at the prediction time by
        the materializer.

        Returns:
            True if the model was loaded successfully, False otherwise.

        """
        try:
            from zenml.utils.materializer_utils import load_model_from_metadata

            self.model = load_model_from_metadata(self.model_uri)
        except Exception as e:
            logger.error("Failed to load model: {}".format(e))
            return False
        self.ready = True
        return self.ready

    def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """Predict the given request.

        The main predict function of the model. This function is called by the
        KServe server when a request is received. Then inside this function,
        the user-defined predict function is called.

        Args:
            request: The request to predict in a dictionary. e.g. {"instances": []}

        Returns:
            The prediction dictionary.

        Raises:
            RuntimeError: If function could not be called.
            NotImplementedError: If the model is not ready.
            TypeError: If the request is not a dictionary.
        """
        if self.predict_func is not None:
            try:
                prediction = {
                    "predictions": self.predict_func(
                        self.model, request["instances"]
                    )
                }
            except RuntimeError as err:
                raise RuntimeError("Failed to predict: {}".format(err))
            if isinstance(prediction, dict):
                return prediction
            else:
                raise TypeError(
                    f"Prediction is not a dictionary. Expecting a dictionary but got {type(prediction)}"
                )
        else:
            raise NotImplementedError("Predict function is not implemented")
__init__(self, model_name, model_uri, predict_func) special

Initializes a ZenMLCustomModel object.

Parameters:

Name Type Description Default
model_name str

The name of the model.

required
model_uri str

The URI of the model.

required
predict_func str

The predict function of the model.

required
Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def __init__(
    self,
    model_name: str,
    model_uri: str,
    predict_func: str,
):
    """Initializes a ZenMLCustomModel object.

    Args:
        model_name: The name of the model.
        model_uri: The URI of the model.
        predict_func: The predict function of the model.
    """
    super().__init__(model_name)
    self.name = model_name
    self.model_uri = model_uri
    self.predict_func = import_class_by_path(predict_func)
    self.model = None
    self.ready = False
load(self)

Load the model.

This function loads the model into memory and sets the ready flag to True.

The model is loaded using the materializer, by saving the information of the artifact to a YAML file in the same path as the model artifacts at the preparing time and loading it again at the prediction time by the materializer.

Returns:

Type Description
bool

True if the model was loaded successfully, False otherwise.

Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def load(self) -> bool:
    """Load the model.

    This function loads the model into memory and sets the ready flag to True.

    The model is loaded using the materializer, by saving the information of
    the artifact to a YAML file in the same path as the model artifacts at
    the preparing time and loading it again at the prediction time by
    the materializer.

    Returns:
        True if the model was loaded successfully, False otherwise.

    """
    try:
        from zenml.utils.materializer_utils import load_model_from_metadata

        self.model = load_model_from_metadata(self.model_uri)
    except Exception as e:
        logger.error("Failed to load model: {}".format(e))
        return False
    self.ready = True
    return self.ready
predict(self, request)

Predict the given request.

The main predict function of the model. This function is called by the KServe server when a request is received. Then inside this function, the user-defined predict function is called.

Parameters:

Name Type Description Default
request Dict[str, Any]

The request to predict in a dictionary. e.g. {"instances": []}

required

Returns:

Type Description
Dict[str, Any]

The prediction dictionary.

Exceptions:

Type Description
RuntimeError

If function could not be called.

NotImplementedError

If the model is not ready.

TypeError

If the request is not a dictionary.

Source code in zenml/integrations/kserve/custom_deployer/zenml_custom_model.py
def predict(self, request: Dict[str, Any]) -> Dict[str, Any]:
    """Predict the given request.

    The main predict function of the model. This function is called by the
    KServe server when a request is received. Then inside this function,
    the user-defined predict function is called.

    Args:
        request: The request to predict in a dictionary. e.g. {"instances": []}

    Returns:
        The prediction dictionary.

    Raises:
        RuntimeError: If function could not be called.
        NotImplementedError: If the model is not ready.
        TypeError: If the request is not a dictionary.
    """
    if self.predict_func is not None:
        try:
            prediction = {
                "predictions": self.predict_func(
                    self.model, request["instances"]
                )
            }
        except RuntimeError as err:
            raise RuntimeError("Failed to predict: {}".format(err))
        if isinstance(prediction, dict):
            return prediction
        else:
            raise TypeError(
                f"Prediction is not a dictionary. Expecting a dictionary but got {type(prediction)}"
            )
    else:
        raise NotImplementedError("Predict function is not implemented")

flavors special

KServe integration flavors.

kserve_model_deployer_flavor

KServe model deployer flavor.

KServeModelDeployerConfig (BaseModelDeployerConfig) pydantic-model

Configuration for the KServeModelDeployer.

Attributes:

Name Type Description
kubernetes_context Optional[str]

the Kubernetes context to use to contact the remote KServe installation. If not specified, the current configuration is used. Depending on where the KServe model deployer is being used, this can be either a locally active context or an in-cluster Kubernetes configuration (if running inside a pod).

kubernetes_namespace Optional[str]

the Kubernetes namespace where the KServe inference service CRDs are provisioned and managed by ZenML. If not specified, the namespace set in the current configuration is used. Depending on where the KServe model deployer is being used, this can be either the current namespace configured in the locally active context or the namespace in the context of which the pod is running (if running inside a pod).

base_url str

the base URL of the Kubernetes ingress used to expose the KServe inference services.

secret Optional[str]

the name of the secret containing the credentials for the KServe inference services.

Source code in zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py
class KServeModelDeployerConfig(BaseModelDeployerConfig):
    """Configuration for the KServeModelDeployer.

    Attributes:
        kubernetes_context: the Kubernetes context to use to contact the remote
            KServe installation. If not specified, the current
            configuration is used. Depending on where the KServe model deployer
            is being used, this can be either a locally active context or an
            in-cluster Kubernetes configuration (if running inside a pod).
        kubernetes_namespace: the Kubernetes namespace where the KServe
            inference service CRDs are provisioned and managed by ZenML. If not
            specified, the namespace set in the current configuration is used.
            Depending on where the KServe model deployer is being used, this can
            be either the current namespace configured in the locally active
            context or the namespace in the context of which the pod is running
            (if running inside a pod).
        base_url: the base URL of the Kubernetes ingress used to expose the
            KServe inference services.
        secret: the name of the secret containing the credentials for the
            KServe inference services.
    """

    kubernetes_context: Optional[str]  # TODO: Potential setting
    kubernetes_namespace: Optional[str]
    base_url: str  # TODO: unused?
    secret: Optional[str]
    custom_domain: Optional[str]  # TODO: unused?
KServeModelDeployerFlavor (BaseModelDeployerFlavor)

Flavor for the KServe model deployer.

Source code in zenml/integrations/kserve/flavors/kserve_model_deployer_flavor.py
class KServeModelDeployerFlavor(BaseModelDeployerFlavor):
    """Flavor for the KServe model deployer."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            Name of the flavor.
        """
        return KSERVE_MODEL_DEPLOYER_FLAVOR

    @property
    def config_class(self) -> Type[KServeModelDeployerConfig]:
        """Returns `KServeModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return KServeModelDeployerConfig

    @property
    def implementation_class(self) -> Type["KServeModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.kserve.model_deployers import (
            KServeModelDeployer,
        )

        return KServeModelDeployer
config_class: Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig] property readonly

Returns KServeModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig]

The config class.

implementation_class: Type[KServeModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[KServeModelDeployer]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

model_deployers special

Initialization of the KServe Model Deployer.

kserve_model_deployer

Implementation of the KServe Model Deployer.

KServeModelDeployer (BaseModelDeployer)

KServe model deployer stack component implementation.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KServeModelDeployer(BaseModelDeployer):
    """KServe model deployer stack component implementation."""

    NAME: ClassVar[str] = "KServe"
    FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = KServeModelDeployerFlavor

    _client: Optional[KServeClient] = None

    @property
    def config(self) -> KServeModelDeployerConfig:
        """Returns the `KServeModelDeployerConfig` config.

        Returns:
            The configuration.
        """
        return cast(KServeModelDeployerConfig, self._config)

    @staticmethod
    def get_model_server_info(  # type: ignore[override]
        service_instance: "KServeDeploymentService",
    ) -> Dict[str, Optional[str]]:
        """Return implementation specific information on the model server.

        Args:
            service_instance: KServe deployment service object

        Returns:
            A dictionary containing the model server information.
        """
        return {
            "PREDICTION_URL": service_instance.prediction_url,
            "PREDICTION_HOSTNAME": service_instance.prediction_hostname,
            "MODEL_URI": service_instance.config.model_uri,
            "MODEL_NAME": service_instance.config.model_name,
            "KSERVE_INFERENCE_SERVICE": service_instance.crd_name,
        }

    @property
    def kserve_client(self) -> KServeClient:
        """Get the KServe client associated with this model deployer.

        Returns:
            The KServeclient.
        """
        if not self._client:
            self._client = KServeClient(
                context=self.config.kubernetes_context,
            )
        return self._client

    def prepare_pipeline_deployment(
        self,
        deployment: "PipelineDeployment",
        stack: "Stack",
    ) -> None:
        """Build a Docker image and push it to the container registry.

        Args:
            deployment: The pipeline deployment configuration.
            stack: The stack on which the pipeline will be deployed.
        """
        needs_docker_image = False
        for step in deployment.steps.values():
            if step.config.extra.get(KSERVE_CUSTOM_DEPLOYMENT, False) is True:
                needs_docker_image = True

        if needs_docker_image:
            docker_image_builder = PipelineDockerImageBuilder()
            repo_digest = docker_image_builder.build_and_push_docker_image(
                deployment=deployment, stack=stack
            )
            deployment.add_extra(KSERVE_DOCKER_IMAGE_KEY, repo_digest)

    def _set_credentials(self) -> None:
        """Set the credentials for the given service instance.

        Raises:
            RuntimeError: if the credentials are not available.
        """
        secret = self._get_kserve_secret()
        if secret:
            secret_folder = Path(
                GlobalConfiguration().config_directory,
                "kserve-storage",
                str(self.id),
            )
            kserve_credentials = {}
            # Handle the secrets attributes
            for key in secret.content.keys():
                content = getattr(secret, key)
                if key == "credentials" and content:
                    fileio.makedirs(str(secret_folder))
                    file_path = Path(secret_folder, f"{key}.json")
                    kserve_credentials["credentials_file"] = str(file_path)
                    with open(file_path, "w") as f:
                        f.write(content)
                    file_path.chmod(0o600)
                # Handle additional params
                else:
                    kserve_credentials[key] = content

            # We need to add the namespace to the kserve_credentials
            kserve_credentials["namespace"] = (
                self.config.kubernetes_namespace
                or utils.get_default_target_namespace()
            )

            try:
                self.kserve_client.set_credentials(**kserve_credentials)
            except Exception as e:
                raise RuntimeError(
                    f"Failed to set credentials for KServe model deployer: {e}"
                )
            finally:
                if file_path.exists():
                    file_path.unlink()

    def deploy_model(
        self,
        config: ServiceConfig,
        replace: bool = False,
        timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Create a new KServe deployment or update an existing one.

        This method has two modes of operation, depending on the `replace`
        argument value:

          * if `replace` is False, calling this method will create a new KServe
            deployment server to reflect the model and other configuration
            parameters specified in the supplied KServe deployment `config`.

          * if `replace` is True, this method will first attempt to find an
            existing KServe deployment that is *equivalent* to the supplied
            configuration parameters. Two or more KServe deployments are
            considered equivalent if they have the same `pipeline_name`,
            `pipeline_step_name` and `model_name` configuration parameters. To
            put it differently, two KServe deployments are equivalent if
            they serve versions of the same model deployed by the same pipeline
            step. If an equivalent KServe deployment is found, it will be
            updated in place to reflect the new configuration parameters. This
            allows an existing KServe deployment to retain its prediction
            URL while performing a rolling update to serve a new model version.

        Callers should set `replace` to True if they want a continuous model
        deployment workflow that doesn't spin up a new KServe deployment
        server for each new model version. If multiple equivalent KServe
        deployments are found, the most recently created deployment is selected
        to be updated and the others are deleted.

        Args:
            config: the configuration of the model to be deployed with KServe.
            replace: set this flag to True to find and update an equivalent
                KServeDeployment server with the new model instead of
                starting a new deployment server.
            timeout: the timeout in seconds to wait for the KServe server
                to be provisioned and successfully started or updated. If set
                to 0, the method will return immediately after the KServe
                server is provisioned, without waiting for it to fully start.

        Returns:
            The ZenML KServe deployment service object that can be used to
            interact with the remote KServe server.

        Raises:
            RuntimeError: if the KServe deployment server could not be stopped.
        """
        config = cast(KServeDeploymentConfig, config)
        service = None

        # if the secret is passed in the config, use it to set the credentials
        if config.secret_name:
            self.config.secret = config.secret_name or self.config.secret
        self._set_credentials()

        # if replace is True, find equivalent KServe deployments
        if replace is True:
            equivalent_services = self.find_model_server(
                running=False,
                pipeline_name=config.pipeline_name,
                pipeline_step_name=config.pipeline_step_name,
                model_name=config.model_name,
            )

            for equivalent_service in equivalent_services:
                if service is None:
                    # keep the most recently created service
                    service = equivalent_service
                else:
                    try:
                        # delete the older services and don't wait for them to
                        # be deprovisioned
                        service.stop()
                    except RuntimeError as e:
                        raise RuntimeError(
                            "Failed to stop the KServe deployment server:\n",
                            f"{e}\n",
                            "Please stop it manually and try again.",
                        )
        if service:
            # update an equivalent service in place
            service.update(config)
            logger.info(
                f"Updating an existing KServe deployment service: {service}"
            )
        else:
            # create a new service
            service = KServeDeploymentService(config=config)
            logger.info(f"Creating a new KServe deployment service: {service}")

        # start the service which in turn provisions the KServe
        # deployment server and waits for it to reach a ready state
        service.start(timeout=timeout)

        # Add telemetry with metadata that gets the stack metadata and
        # differentiates between pure model and custom code deployments
        stack = Client().active_stack
        stack_metadata = {
            component_type.value: component.flavor
            for component_type, component in stack.components.items()
        }
        metadata = {
            "store_type": Client().zen_store.type.value,
            **stack_metadata,
            "is_custom_code_deployment": config.container is not None,
        }
        track_event(AnalyticsEvent.MODEL_DEPLOYED, metadata=metadata)

        return service

    def get_kserve_deployments(
        self, labels: Dict[str, str]
    ) -> List[V1beta1InferenceService]:
        """Get a list of KServe deployments that match the supplied labels.

        Args:
            labels: a dictionary of labels to match against KServe deployments.

        Returns:
            A list of KServe deployments that match the supplied labels.

        Raises:
            RuntimeError: if an operational failure is encountered while
        """
        label_selector = (
            ",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
        )

        namespace = (
            self.config.kubernetes_namespace
            or utils.get_default_target_namespace()
        )

        try:
            response = (
                self.kserve_client.api_instance.list_namespaced_custom_object(
                    constants.KSERVE_GROUP,
                    constants.KSERVE_V1BETA1_VERSION,
                    namespace,
                    constants.KSERVE_PLURAL,
                    label_selector=label_selector,
                )
            )
        except client.rest.ApiException as e:
            raise RuntimeError(
                "Exception when retrieving KServe inference services\
                %s\n"
                % e
            )

        # TODO[CRITICAL]: de-serialize each item into a complete
        #   V1beta1InferenceService object recursively using the OpenApi
        #   schema (this doesn't work right now)
        inference_services: List[V1beta1InferenceService] = []
        for item in response.get("items", []):
            snake_case_item = self._camel_to_snake(item)
            inference_service = V1beta1InferenceService(**snake_case_item)
            inference_services.append(inference_service)
        return inference_services

    def _camel_to_snake(self, obj: Dict[str, Any]) -> Dict[str, Any]:
        """Convert a camelCase dictionary to snake_case.

        Args:
            obj: a dictionary with camelCase keys

        Returns:
            a dictionary with snake_case keys
        """
        if isinstance(obj, (str, int, float)):
            return obj
        if isinstance(obj, dict):
            assert obj is not None
            new = obj.__class__()
            for k, v in obj.items():
                new[self._convert_to_snake(k)] = self._camel_to_snake(v)
        elif isinstance(obj, (list, set, tuple)):
            assert obj is not None
            new = obj.__class__(self._camel_to_snake(v) for v in obj)
        else:
            return obj
        return new

    def _convert_to_snake(self, k: str) -> str:
        return re.sub(r"(?<!^)(?=[A-Z])", "_", k).lower()

    def find_model_server(
        self,
        running: bool = False,
        service_uuid: Optional[UUID] = None,
        pipeline_name: Optional[str] = None,
        pipeline_run_id: Optional[str] = None,
        pipeline_step_name: Optional[str] = None,
        model_name: Optional[str] = None,
        model_uri: Optional[str] = None,
        predictor: Optional[str] = None,
    ) -> List[BaseService]:
        """Find one or more KServe model services that match the given criteria.

        Args:
            running: If true, only running services will be returned.
            service_uuid: The UUID of the service that was originally used
                to deploy the model.
            pipeline_name: name of the pipeline that the deployed model was part
                of.
            pipeline_run_id: ID of the pipeline run which the deployed model was
                part of.
            pipeline_step_name: the name of the pipeline model deployment step
                that deployed the model.
            model_name: the name of the deployed model.
            model_uri: URI of the deployed model.
            predictor: the name of the predictor that was used to deploy the model.

        Returns:
            One or more Service objects representing model servers that match
            the input search criteria.
        """
        config = KServeDeploymentConfig(
            pipeline_name=pipeline_name or "",
            pipeline_run_id=pipeline_run_id or "",
            pipeline_step_name=pipeline_step_name or "",
            model_uri=model_uri or "",
            model_name=model_name or "",
            predictor=predictor or "",
            resources={},
        )
        labels = config.get_kubernetes_labels()

        if service_uuid:
            labels["zenml.service_uuid"] = str(service_uuid)

        deployments = self.get_kserve_deployments(labels=labels)

        services: List[BaseService] = []
        for deployment in deployments:
            # recreate the KServe deployment service object from the KServe
            # deployment resource
            service = KServeDeploymentService.create_from_deployment(
                deployment=deployment
            )
            if running and not service.is_running:
                # skip non-running services
                continue
            services.append(service)

        return services

    def stop_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Stop a KServe model server.

        Args:
            uuid: UUID of the model server to stop.
            timeout: timeout in seconds to wait for the service to stop.
            force: if True, force the service to stop.

        Raises:
            NotImplementedError: stopping on KServe model servers is not
                supported.
        """
        raise NotImplementedError(
            "Stopping KServe model servers is not implemented. Try "
            "deleting the KServe model server instead."
        )

    def start_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
    ) -> None:
        """Start a KServe model deployment server.

        Args:
            uuid: UUID of the model server to start.
            timeout: timeout in seconds to wait for the service to become
                active. . If set to 0, the method will return immediately after
                provisioning the service, without waiting for it to become
                active.

        Raises:
            NotImplementedError: since we don't support starting KServe
                model servers
        """
        raise NotImplementedError(
            "Starting KServe model servers is not implemented"
        )

    def delete_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Delete a KServe model deployment server.

        Args:
            uuid: UUID of the model server to delete.
            timeout: timeout in seconds to wait for the service to stop. If
                set to 0, the method will return immediately after
                deprovisioning the service, without waiting for it to stop.
            force: if True, force the service to stop.
        """
        services = self.find_model_server(service_uuid=uuid)
        if len(services) == 0:
            return
        services[0].stop(timeout=timeout, force=force)

    def _get_kserve_secret(self) -> Any:
        """Get the secret object for the KServe deployment.

        Returns:
            The secret object for the KServe deployment.

        Raises:
            RuntimeError: if the secret object is not found or secrets_manager is not set.
        """
        if self.config.secret:

            secret_manager = Client().active_stack.secrets_manager

            if not secret_manager or not isinstance(
                secret_manager, BaseSecretsManager
            ):
                raise RuntimeError(
                    f"The active stack doesn't have a secret manager component. "
                    f"The ZenML secret specified in the KServe Model "
                    f"Deployer configuration cannot be fetched: {self.config.secret}."
                )
            try:
                secret = secret_manager.get_secret(self.config.secret)
                return secret
            except KeyError:
                raise RuntimeError(
                    f"The secret `{self.config.secret}` used for your KServe Model"
                    f"Deployer configuration does not exist in your secrets "
                    f"manager `{secret_manager.name}`."
                )
        return None
config: KServeModelDeployerConfig property readonly

Returns the KServeModelDeployerConfig config.

Returns:

Type Description
KServeModelDeployerConfig

The configuration.

kserve_client: KServeClient property readonly

Get the KServe client associated with this model deployer.

Returns:

Type Description
KServeClient

The KServeclient.

FLAVOR (BaseModelDeployerFlavor)

Flavor for the KServe model deployer.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
class KServeModelDeployerFlavor(BaseModelDeployerFlavor):
    """Flavor for the KServe model deployer."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            Name of the flavor.
        """
        return KSERVE_MODEL_DEPLOYER_FLAVOR

    @property
    def config_class(self) -> Type[KServeModelDeployerConfig]:
        """Returns `KServeModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return KServeModelDeployerConfig

    @property
    def implementation_class(self) -> Type["KServeModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.kserve.model_deployers import (
            KServeModelDeployer,
        )

        return KServeModelDeployer
config_class: Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig] property readonly

Returns KServeModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.kserve.flavors.kserve_model_deployer_flavor.KServeModelDeployerConfig]

The config class.

implementation_class: Type[KServeModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[KServeModelDeployer]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

delete_model_server(self, uuid, timeout=300, force=False)

Delete a KServe model deployment server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to delete.

required
timeout int

timeout in seconds to wait for the service to stop. If set to 0, the method will return immediately after deprovisioning the service, without waiting for it to stop.

300
force bool

if True, force the service to stop.

False
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def delete_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Delete a KServe model deployment server.

    Args:
        uuid: UUID of the model server to delete.
        timeout: timeout in seconds to wait for the service to stop. If
            set to 0, the method will return immediately after
            deprovisioning the service, without waiting for it to stop.
        force: if True, force the service to stop.
    """
    services = self.find_model_server(service_uuid=uuid)
    if len(services) == 0:
        return
    services[0].stop(timeout=timeout, force=force)
deploy_model(self, config, replace=False, timeout=300)

Create a new KServe deployment or update an existing one.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new KServe deployment server to reflect the model and other configuration parameters specified in the supplied KServe deployment config.

  • if replace is True, this method will first attempt to find an existing KServe deployment that is equivalent to the supplied configuration parameters. Two or more KServe deployments are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two KServe deployments are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent KServe deployment is found, it will be updated in place to reflect the new configuration parameters. This allows an existing KServe deployment to retain its prediction URL while performing a rolling update to serve a new model version.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new KServe deployment server for each new model version. If multiple equivalent KServe deployments are found, the most recently created deployment is selected to be updated and the others are deleted.

Parameters:

Name Type Description Default
config ServiceConfig

the configuration of the model to be deployed with KServe.

required
replace bool

set this flag to True to find and update an equivalent KServeDeployment server with the new model instead of starting a new deployment server.

False
timeout int

the timeout in seconds to wait for the KServe server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the KServe server is provisioned, without waiting for it to fully start.

300

Returns:

Type Description
BaseService

The ZenML KServe deployment service object that can be used to interact with the remote KServe server.

Exceptions:

Type Description
RuntimeError

if the KServe deployment server could not be stopped.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def deploy_model(
    self,
    config: ServiceConfig,
    replace: bool = False,
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new KServe deployment or update an existing one.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new KServe
        deployment server to reflect the model and other configuration
        parameters specified in the supplied KServe deployment `config`.

      * if `replace` is True, this method will first attempt to find an
        existing KServe deployment that is *equivalent* to the supplied
        configuration parameters. Two or more KServe deployments are
        considered equivalent if they have the same `pipeline_name`,
        `pipeline_step_name` and `model_name` configuration parameters. To
        put it differently, two KServe deployments are equivalent if
        they serve versions of the same model deployed by the same pipeline
        step. If an equivalent KServe deployment is found, it will be
        updated in place to reflect the new configuration parameters. This
        allows an existing KServe deployment to retain its prediction
        URL while performing a rolling update to serve a new model version.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new KServe deployment
    server for each new model version. If multiple equivalent KServe
    deployments are found, the most recently created deployment is selected
    to be updated and the others are deleted.

    Args:
        config: the configuration of the model to be deployed with KServe.
        replace: set this flag to True to find and update an equivalent
            KServeDeployment server with the new model instead of
            starting a new deployment server.
        timeout: the timeout in seconds to wait for the KServe server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the KServe
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML KServe deployment service object that can be used to
        interact with the remote KServe server.

    Raises:
        RuntimeError: if the KServe deployment server could not be stopped.
    """
    config = cast(KServeDeploymentConfig, config)
    service = None

    # if the secret is passed in the config, use it to set the credentials
    if config.secret_name:
        self.config.secret = config.secret_name or self.config.secret
    self._set_credentials()

    # if replace is True, find equivalent KServe deployments
    if replace is True:
        equivalent_services = self.find_model_server(
            running=False,
            pipeline_name=config.pipeline_name,
            pipeline_step_name=config.pipeline_step_name,
            model_name=config.model_name,
        )

        for equivalent_service in equivalent_services:
            if service is None:
                # keep the most recently created service
                service = equivalent_service
            else:
                try:
                    # delete the older services and don't wait for them to
                    # be deprovisioned
                    service.stop()
                except RuntimeError as e:
                    raise RuntimeError(
                        "Failed to stop the KServe deployment server:\n",
                        f"{e}\n",
                        "Please stop it manually and try again.",
                    )
    if service:
        # update an equivalent service in place
        service.update(config)
        logger.info(
            f"Updating an existing KServe deployment service: {service}"
        )
    else:
        # create a new service
        service = KServeDeploymentService(config=config)
        logger.info(f"Creating a new KServe deployment service: {service}")

    # start the service which in turn provisions the KServe
    # deployment server and waits for it to reach a ready state
    service.start(timeout=timeout)

    # Add telemetry with metadata that gets the stack metadata and
    # differentiates between pure model and custom code deployments
    stack = Client().active_stack
    stack_metadata = {
        component_type.value: component.flavor
        for component_type, component in stack.components.items()
    }
    metadata = {
        "store_type": Client().zen_store.type.value,
        **stack_metadata,
        "is_custom_code_deployment": config.container is not None,
    }
    track_event(AnalyticsEvent.MODEL_DEPLOYED, metadata=metadata)

    return service
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, pipeline_run_id=None, pipeline_step_name=None, model_name=None, model_uri=None, predictor=None)

Find one or more KServe model services that match the given criteria.

Parameters:

Name Type Description Default
running bool

If true, only running services will be returned.

False
service_uuid Optional[uuid.UUID]

The UUID of the service that was originally used to deploy the model.

None
pipeline_name Optional[str]

name of the pipeline that the deployed model was part of.

None
pipeline_run_id Optional[str]

ID of the pipeline run which the deployed model was part of.

None
pipeline_step_name Optional[str]

the name of the pipeline model deployment step that deployed the model.

None
model_name Optional[str]

the name of the deployed model.

None
model_uri Optional[str]

URI of the deployed model.

None
predictor Optional[str]

the name of the predictor that was used to deploy the model.

None

Returns:

Type Description
List[zenml.services.service.BaseService]

One or more Service objects representing model servers that match the input search criteria.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def find_model_server(
    self,
    running: bool = False,
    service_uuid: Optional[UUID] = None,
    pipeline_name: Optional[str] = None,
    pipeline_run_id: Optional[str] = None,
    pipeline_step_name: Optional[str] = None,
    model_name: Optional[str] = None,
    model_uri: Optional[str] = None,
    predictor: Optional[str] = None,
) -> List[BaseService]:
    """Find one or more KServe model services that match the given criteria.

    Args:
        running: If true, only running services will be returned.
        service_uuid: The UUID of the service that was originally used
            to deploy the model.
        pipeline_name: name of the pipeline that the deployed model was part
            of.
        pipeline_run_id: ID of the pipeline run which the deployed model was
            part of.
        pipeline_step_name: the name of the pipeline model deployment step
            that deployed the model.
        model_name: the name of the deployed model.
        model_uri: URI of the deployed model.
        predictor: the name of the predictor that was used to deploy the model.

    Returns:
        One or more Service objects representing model servers that match
        the input search criteria.
    """
    config = KServeDeploymentConfig(
        pipeline_name=pipeline_name or "",
        pipeline_run_id=pipeline_run_id or "",
        pipeline_step_name=pipeline_step_name or "",
        model_uri=model_uri or "",
        model_name=model_name or "",
        predictor=predictor or "",
        resources={},
    )
    labels = config.get_kubernetes_labels()

    if service_uuid:
        labels["zenml.service_uuid"] = str(service_uuid)

    deployments = self.get_kserve_deployments(labels=labels)

    services: List[BaseService] = []
    for deployment in deployments:
        # recreate the KServe deployment service object from the KServe
        # deployment resource
        service = KServeDeploymentService.create_from_deployment(
            deployment=deployment
        )
        if running and not service.is_running:
            # skip non-running services
            continue
        services.append(service)

    return services
get_kserve_deployments(self, labels)

Get a list of KServe deployments that match the supplied labels.

Parameters:

Name Type Description Default
labels Dict[str, str]

a dictionary of labels to match against KServe deployments.

required

Returns:

Type Description
List[kserve.models.v1beta1_inference_service.V1beta1InferenceService]

A list of KServe deployments that match the supplied labels.

Exceptions:

Type Description
RuntimeError

if an operational failure is encountered while

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def get_kserve_deployments(
    self, labels: Dict[str, str]
) -> List[V1beta1InferenceService]:
    """Get a list of KServe deployments that match the supplied labels.

    Args:
        labels: a dictionary of labels to match against KServe deployments.

    Returns:
        A list of KServe deployments that match the supplied labels.

    Raises:
        RuntimeError: if an operational failure is encountered while
    """
    label_selector = (
        ",".join(f"{k}={v}" for k, v in labels.items()) if labels else None
    )

    namespace = (
        self.config.kubernetes_namespace
        or utils.get_default_target_namespace()
    )

    try:
        response = (
            self.kserve_client.api_instance.list_namespaced_custom_object(
                constants.KSERVE_GROUP,
                constants.KSERVE_V1BETA1_VERSION,
                namespace,
                constants.KSERVE_PLURAL,
                label_selector=label_selector,
            )
        )
    except client.rest.ApiException as e:
        raise RuntimeError(
            "Exception when retrieving KServe inference services\
            %s\n"
            % e
        )

    # TODO[CRITICAL]: de-serialize each item into a complete
    #   V1beta1InferenceService object recursively using the OpenApi
    #   schema (this doesn't work right now)
    inference_services: List[V1beta1InferenceService] = []
    for item in response.get("items", []):
        snake_case_item = self._camel_to_snake(item)
        inference_service = V1beta1InferenceService(**snake_case_item)
        inference_services.append(inference_service)
    return inference_services
get_model_server_info(service_instance) staticmethod

Return implementation specific information on the model server.

Parameters:

Name Type Description Default
service_instance KServeDeploymentService

KServe deployment service object

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the model server information.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "KServeDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information on the model server.

    Args:
        service_instance: KServe deployment service object

    Returns:
        A dictionary containing the model server information.
    """
    return {
        "PREDICTION_URL": service_instance.prediction_url,
        "PREDICTION_HOSTNAME": service_instance.prediction_hostname,
        "MODEL_URI": service_instance.config.model_uri,
        "MODEL_NAME": service_instance.config.model_name,
        "KSERVE_INFERENCE_SERVICE": service_instance.crd_name,
    }
prepare_pipeline_deployment(self, deployment, stack)

Build a Docker image and push it to the container registry.

Parameters:

Name Type Description Default
deployment PipelineDeployment

The pipeline deployment configuration.

required
stack Stack

The stack on which the pipeline will be deployed.

required
Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def prepare_pipeline_deployment(
    self,
    deployment: "PipelineDeployment",
    stack: "Stack",
) -> None:
    """Build a Docker image and push it to the container registry.

    Args:
        deployment: The pipeline deployment configuration.
        stack: The stack on which the pipeline will be deployed.
    """
    needs_docker_image = False
    for step in deployment.steps.values():
        if step.config.extra.get(KSERVE_CUSTOM_DEPLOYMENT, False) is True:
            needs_docker_image = True

    if needs_docker_image:
        docker_image_builder = PipelineDockerImageBuilder()
        repo_digest = docker_image_builder.build_and_push_docker_image(
            deployment=deployment, stack=stack
        )
        deployment.add_extra(KSERVE_DOCKER_IMAGE_KEY, repo_digest)
start_model_server(self, uuid, timeout=300)

Start a KServe model deployment server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to start.

required
timeout int

timeout in seconds to wait for the service to become active. . If set to 0, the method will return immediately after provisioning the service, without waiting for it to become active.

300

Exceptions:

Type Description
NotImplementedError

since we don't support starting KServe model servers

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def start_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
) -> None:
    """Start a KServe model deployment server.

    Args:
        uuid: UUID of the model server to start.
        timeout: timeout in seconds to wait for the service to become
            active. . If set to 0, the method will return immediately after
            provisioning the service, without waiting for it to become
            active.

    Raises:
        NotImplementedError: since we don't support starting KServe
            model servers
    """
    raise NotImplementedError(
        "Starting KServe model servers is not implemented"
    )
stop_model_server(self, uuid, timeout=300, force=False)

Stop a KServe model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to stop.

required
timeout int

timeout in seconds to wait for the service to stop.

300
force bool

if True, force the service to stop.

False

Exceptions:

Type Description
NotImplementedError

stopping on KServe model servers is not supported.

Source code in zenml/integrations/kserve/model_deployers/kserve_model_deployer.py
def stop_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Stop a KServe model server.

    Args:
        uuid: UUID of the model server to stop.
        timeout: timeout in seconds to wait for the service to stop.
        force: if True, force the service to stop.

    Raises:
        NotImplementedError: stopping on KServe model servers is not
            supported.
    """
    raise NotImplementedError(
        "Stopping KServe model servers is not implemented. Try "
        "deleting the KServe model server instead."
    )

secret_schemas special

Initialization of Kserve Secret Schemas.

These are secret schemas that can be used to authenticate Kserve to the Artifact Store used to store served ML models.

secret_schemas

Implementation for KServe secret schemas.

KServeAzureSecretSchema (BaseSecretSchema) pydantic-model

KServe Azure Blob Storage credentials.

Attributes:

Name Type Description
storage_type Literal['Azure']

the storage type. Must be set to "GCS" for this schema.

credentials Optional[str]

the credentials to use.

Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
class KServeAzureSecretSchema(BaseSecretSchema):
    """KServe Azure Blob Storage credentials.

    Attributes:
        storage_type: the storage type. Must be set to "GCS" for this schema.
        credentials: the credentials to use.
    """

    TYPE: ClassVar[str] = KSERVE_AZUREBLOB_SECRET_SCHEMA_TYPE

    storage_type: Literal["Azure"] = "Azure"
    credentials: Optional[str]
KServeGSSecretSchema (BaseSecretSchema) pydantic-model

KServe GCS credentials.

Attributes:

Name Type Description
storage_type Literal['GCS']

the storage type. Must be set to "GCS" for this schema.

credentials Optional[str]

the credentials to use.

service_account Optional[str]

the service account.

Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
class KServeGSSecretSchema(BaseSecretSchema):
    """KServe GCS credentials.

    Attributes:
        storage_type: the storage type. Must be set to "GCS" for this schema.
        credentials: the credentials to use.
        service_account: the service account.
    """

    TYPE: ClassVar[str] = KSERVE_GS_SECRET_SCHEMA_TYPE

    storage_type: Literal["GCS"] = "GCS"
    credentials: Optional[str]
    service_account: Optional[str]
KServeS3SecretSchema (BaseSecretSchema) pydantic-model

KServe S3 credentials.

Attributes:

Name Type Description
storage_type Literal['S3']

the storage type. Must be set to "s3" for this schema.

credentials Optional[str]

the credentials to use.

service_account Optional[str]

the name of the service account.

s3_endpoint Optional[str]

the S3 endpoint.

s3_region Optional[str]

the S3 region.

s3_use_https Optional[str]

whether to use HTTPS.

s3_verify_ssl Optional[str]

whether to verify SSL.

Source code in zenml/integrations/kserve/secret_schemas/secret_schemas.py
class KServeS3SecretSchema(BaseSecretSchema):
    """KServe S3 credentials.

    Attributes:
        storage_type: the storage type. Must be set to "s3" for this schema.
        credentials: the credentials to use.
        service_account: the name of the service account.
        s3_endpoint: the S3 endpoint.
        s3_region: the S3 region.
        s3_use_https: whether to use HTTPS.
        s3_verify_ssl: whether to verify SSL.
    """

    TYPE: ClassVar[str] = KSERVE_S3_SECRET_SCHEMA_TYPE

    storage_type: Literal["S3"] = "S3"
    credentials: Optional[str]
    service_account: Optional[str]
    s3_endpoint: Optional[str]
    s3_region: Optional[str]
    s3_use_https: Optional[str]
    s3_verify_ssl: Optional[str]

services special

Initialization for KServe services.

kserve_deployment

Implementation for the KServe inference service.

KServeDeploymentConfig (ServiceConfig) pydantic-model

KServe deployment service configuration.

Attributes:

Name Type Description
model_uri str

URI of the model (or models) to serve.

model_name str

the name of the model. Multiple versions of the same model should use the same model name.

secret_name Optional[str]

the name of the secret containing the model.

predictor str

the KServe predictor used to serve the model. The

predictor type can be one of the following

tensorflow, pytorch,

replicas int

number of replicas to use for the prediction service.

resources Optional[Dict[str, Any]]

the Kubernetes resources to allocate for the prediction service.

container Optional[Dict[str, Any]]

the container to use for the custom prediction services.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
class KServeDeploymentConfig(ServiceConfig):
    """KServe deployment service configuration.

    Attributes:
        model_uri: URI of the model (or models) to serve.
        model_name: the name of the model. Multiple versions of the same model
            should use the same model name.
        secret_name: the name of the secret containing the model.
        predictor: the KServe predictor used to serve the model. The
        predictor type can be one of the following: `tensorflow`, `pytorch`,
        `sklearn`, `xgboost`, `custom`.
        replicas: number of replicas to use for the prediction service.
        resources: the Kubernetes resources to allocate for the prediction service.
        container: the container to use for the custom prediction services.
    """

    model_uri: str = ""
    model_name: str
    secret_name: Optional[str]
    predictor: str
    replicas: int = 1
    container: Optional[Dict[str, Any]]
    resources: Optional[Dict[str, Any]]

    @staticmethod
    def sanitize_labels(labels: Dict[str, str]) -> None:
        """Update the label values to be valid Kubernetes labels.

        See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

        Args:
            labels: The labels to sanitize.
        """
        # TODO[MEDIUM]: Move k8s label sanitization to a common module with all K8s utils.
        for key, value in labels.items():
            # Kubernetes labels must be alphanumeric, no longer than
            # 63 characters, and must begin and end with an alphanumeric
            # character ([a-z0-9A-Z])
            labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
                "-_."
            )

    def get_kubernetes_labels(self) -> Dict[str, str]:
        """Generate the labels for the KServe inference CRD from the service configuration.

        These labels are attached to the KServe inference service CRD
        and may be used as label selectors in lookup operations.

        Returns:
            The labels for the KServe inference service CRD.
        """
        labels = {"app": "zenml"}
        if self.pipeline_name:
            labels["zenml.pipeline_name"] = self.pipeline_name
        if self.pipeline_run_id:
            labels["zenml.pipeline_run_id"] = self.pipeline_run_id
        if self.pipeline_step_name:
            labels["zenml.pipeline_step_name"] = self.pipeline_step_name
        if self.model_name:
            labels["zenml.model_name"] = self.model_name
        if self.model_uri:
            labels["zenml.model_uri"] = self.model_uri
        if self.predictor:
            labels["zenml.model_type"] = self.predictor
        self.sanitize_labels(labels)
        return labels

    def get_kubernetes_annotations(self) -> Dict[str, str]:
        """Generate the annotations for the KServe inference CRD the service configuration.

        The annotations are used to store additional information about the
        KServe ZenML service associated with the deployment that is
        not available on the labels. One annotation is particularly important
        is the serialized Service configuration itself, which is used to
        recreate the service configuration from a remote KServe inference
        service CRD.

        Returns:
            The annotations for the KServe inference service CRD.
        """
        annotations = {
            "zenml.service_config": self.json(),
            "zenml.version": __version__,
        }
        return annotations

    @classmethod
    def create_from_deployment(
        cls, deployment: V1beta1InferenceService
    ) -> "KServeDeploymentConfig":
        """Recreate a KServe service from a KServe deployment resource.

        Args:
            deployment: the KServe inference service CRD.

        Returns:
            The KServe ZenML service configuration corresponding to the given
            KServe inference service CRD.

        Raises:
            ValueError: if the given deployment resource does not contain
                the expected annotations or it contains an invalid or
                incompatible KServe ZenML service configuration.
        """
        config_data = deployment.metadata.get("annotations").get(
            "zenml.service_config"
        )
        if not config_data:
            raise ValueError(
                f"The given deployment resource does not contain a "
                f"'zenml.service_config' annotation: {deployment}"
            )
        try:
            service_config = cls.parse_raw(config_data)
        except ValidationError as e:
            raise ValueError(
                f"The loaded KServe Inference Service resource contains an "
                f"invalid or incompatible KServe ZenML service configuration: "
                f"{config_data}"
            ) from e
        return service_config
create_from_deployment(deployment) classmethod

Recreate a KServe service from a KServe deployment resource.

Parameters:

Name Type Description Default
deployment V1beta1InferenceService

the KServe inference service CRD.

required

Returns:

Type Description
KServeDeploymentConfig

The KServe ZenML service configuration corresponding to the given KServe inference service CRD.

Exceptions:

Type Description
ValueError

if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible KServe ZenML service configuration.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
@classmethod
def create_from_deployment(
    cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentConfig":
    """Recreate a KServe service from a KServe deployment resource.

    Args:
        deployment: the KServe inference service CRD.

    Returns:
        The KServe ZenML service configuration corresponding to the given
        KServe inference service CRD.

    Raises:
        ValueError: if the given deployment resource does not contain
            the expected annotations or it contains an invalid or
            incompatible KServe ZenML service configuration.
    """
    config_data = deployment.metadata.get("annotations").get(
        "zenml.service_config"
    )
    if not config_data:
        raise ValueError(
            f"The given deployment resource does not contain a "
            f"'zenml.service_config' annotation: {deployment}"
        )
    try:
        service_config = cls.parse_raw(config_data)
    except ValidationError as e:
        raise ValueError(
            f"The loaded KServe Inference Service resource contains an "
            f"invalid or incompatible KServe ZenML service configuration: "
            f"{config_data}"
        ) from e
    return service_config
get_kubernetes_annotations(self)

Generate the annotations for the KServe inference CRD the service configuration.

The annotations are used to store additional information about the KServe ZenML service associated with the deployment that is not available on the labels. One annotation is particularly important is the serialized Service configuration itself, which is used to recreate the service configuration from a remote KServe inference service CRD.

Returns:

Type Description
Dict[str, str]

The annotations for the KServe inference service CRD.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_kubernetes_annotations(self) -> Dict[str, str]:
    """Generate the annotations for the KServe inference CRD the service configuration.

    The annotations are used to store additional information about the
    KServe ZenML service associated with the deployment that is
    not available on the labels. One annotation is particularly important
    is the serialized Service configuration itself, which is used to
    recreate the service configuration from a remote KServe inference
    service CRD.

    Returns:
        The annotations for the KServe inference service CRD.
    """
    annotations = {
        "zenml.service_config": self.json(),
        "zenml.version": __version__,
    }
    return annotations
get_kubernetes_labels(self)

Generate the labels for the KServe inference CRD from the service configuration.

These labels are attached to the KServe inference service CRD and may be used as label selectors in lookup operations.

Returns:

Type Description
Dict[str, str]

The labels for the KServe inference service CRD.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_kubernetes_labels(self) -> Dict[str, str]:
    """Generate the labels for the KServe inference CRD from the service configuration.

    These labels are attached to the KServe inference service CRD
    and may be used as label selectors in lookup operations.

    Returns:
        The labels for the KServe inference service CRD.
    """
    labels = {"app": "zenml"}
    if self.pipeline_name:
        labels["zenml.pipeline_name"] = self.pipeline_name
    if self.pipeline_run_id:
        labels["zenml.pipeline_run_id"] = self.pipeline_run_id
    if self.pipeline_step_name:
        labels["zenml.pipeline_step_name"] = self.pipeline_step_name
    if self.model_name:
        labels["zenml.model_name"] = self.model_name
    if self.model_uri:
        labels["zenml.model_uri"] = self.model_uri
    if self.predictor:
        labels["zenml.model_type"] = self.predictor
    self.sanitize_labels(labels)
    return labels
sanitize_labels(labels) staticmethod

Update the label values to be valid Kubernetes labels.

See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

Parameters:

Name Type Description Default
labels Dict[str, str]

The labels to sanitize.

required
Source code in zenml/integrations/kserve/services/kserve_deployment.py
@staticmethod
def sanitize_labels(labels: Dict[str, str]) -> None:
    """Update the label values to be valid Kubernetes labels.

    See: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

    Args:
        labels: The labels to sanitize.
    """
    # TODO[MEDIUM]: Move k8s label sanitization to a common module with all K8s utils.
    for key, value in labels.items():
        # Kubernetes labels must be alphanumeric, no longer than
        # 63 characters, and must begin and end with an alphanumeric
        # character ([a-z0-9A-Z])
        labels[key] = re.sub(r"[^0-9a-zA-Z-_\.]+", "_", value)[:63].strip(
            "-_."
        )
KServeDeploymentService (BaseService) pydantic-model

A ZenML service that represents a KServe inference service CRD.

Attributes:

Name Type Description
config KServeDeploymentConfig

service configuration.

status ServiceStatus

service status.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
class KServeDeploymentService(BaseService):
    """A ZenML service that represents a KServe inference service CRD.

    Attributes:
        config: service configuration.
        status: service status.
    """

    SERVICE_TYPE = ServiceType(
        name="kserve-deployment",
        type="model-serving",
        flavor="kserve",
        description="KServe inference service",
    )

    config: KServeDeploymentConfig = Field(
        default_factory=KServeDeploymentConfig
    )
    status: ServiceStatus = Field(default_factory=ServiceStatus)

    def _get_model_deployer(self) -> "KServeModelDeployer":
        """Get the active KServe model deployer.

        Returns:
            The active KServeModelDeployer.
        """
        from zenml.integrations.kserve.model_deployers.kserve_model_deployer import (
            KServeModelDeployer,
        )

        return cast(
            KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
        )

    def _get_client(self) -> KServeClient:
        """Get the KServe client from the active KServe model deployer.

        Returns:
            The KServe client.
        """
        return self._get_model_deployer().kserve_client

    def _get_namespace(self) -> Optional[str]:
        """Get the Kubernetes namespace from the active KServe model deployer.

        Returns:
            The Kubernetes namespace, or None, if the default namespace is
            used.
        """
        return self._get_model_deployer().config.kubernetes_namespace

    def check_status(self) -> Tuple[ServiceState, str]:
        """Check the state of the KServe inference service.

        This method Checks the current operational state of the external KServe
        inference service and translate it into a `ServiceState` value and a printable message.

        This method should be overridden by subclasses that implement concrete service tracking functionality.

        Returns:
            The operational state of the external service and a message
            providing additional information about that state (e.g. a
            description of the error if one is encountered while checking the
            service status).
        """
        client = self._get_client()
        namespace = self._get_namespace()

        name = self.crd_name
        try:
            deployment = client.get(name=name, namespace=namespace)
        except RuntimeError:
            return (ServiceState.INACTIVE, "")

        # TODO[MEDIUM]: Implement better operational status checking that also
        #   cover errors
        if "status" not in deployment:
            return (ServiceState.INACTIVE, "No operational status available")
        status = "Unknown"
        for condition in deployment["status"].get("conditions", {}):
            if condition.get("type", "") == "PredictorReady":
                status = condition.get("status", "Unknown")
                if status.lower() == "true":
                    return (
                        ServiceState.ACTIVE,
                        f"Inference service '{name}' is available",
                    )

                elif status.lower() == "false":
                    return (
                        ServiceState.PENDING_STARTUP,
                        f"Inference service '{name}' is not available: {condition.get('message', 'Unknown')}",
                    )
        return (
            ServiceState.PENDING_STARTUP,
            f"Inference service '{name}' still starting up",
        )

    @property
    def crd_name(self) -> str:
        """Get the name of the KServe inference service CRD that uniquely corresponds to this service instance.

        Returns:
            The name of the KServe inference service CRD.
        """
        return (
            self._get_kubernetes_labels().get("zenml.model_name")
            or f"zenml-{str(self.uuid)[:8]}"
        )

    def _get_kubernetes_labels(self) -> Dict[str, str]:
        """Generate the labels for the KServe inference service CRD from the service configuration.

        Returns:
            The labels for the KServe inference service.
        """
        labels = self.config.get_kubernetes_labels()
        labels["zenml.service_uuid"] = str(self.uuid)
        KServeDeploymentConfig.sanitize_labels(labels)
        return labels

    @classmethod
    def create_from_deployment(
        cls, deployment: V1beta1InferenceService
    ) -> "KServeDeploymentService":
        """Recreate the configuration of a KServe Service from a deployed instance.

        Args:
            deployment: the KServe deployment resource.

        Returns:
            The KServe service configuration corresponding to the given
            KServe deployment resource.

        Raises:
            ValueError: if the given deployment resource does not contain
                the expected annotations or it contains an invalid or
                incompatible KServe service configuration.
        """
        config = KServeDeploymentConfig.create_from_deployment(deployment)
        uuid = deployment.metadata.get("labels").get("zenml.service_uuid")
        if not uuid:
            raise ValueError(
                f"The given deployment resource does not contain a valid "
                f"'zenml.service_uuid' label: {deployment}"
            )
        service = cls(uuid=UUID(uuid), config=config)
        service.update_status()
        return service

    def provision(self) -> None:
        """Provision or update remote KServe deployment instance.

        This should then match the current configuration.
        """
        client = self._get_client()
        namespace = self._get_namespace()

        api_version = constants.KSERVE_GROUP + "/" + "v1beta1"
        name = self.crd_name

        # All supported model specs seem to have the same fields
        # so we can use any one of them (see https://kserve.github.io/website/0.8/reference/api/#serving.kserve.io/v1beta1.PredictorExtensionSpec)
        if self.config.container is not None:
            predictor_kwargs = {
                "containers": [
                    k8s_client.V1Container(
                        name=self.config.container.get("name"),
                        image=self.config.container.get("image"),
                        command=self.config.container.get("command"),
                        args=self.config.container.get("args"),
                        env=[
                            k8s_client.V1EnvVar(
                                name="STORAGE_URI",
                                value=self.config.container.get("storage_uri"),
                            )
                        ],
                    )
                ]
            }
        else:
            predictor_kwargs = {
                self.config.predictor: V1beta1PredictorExtensionSpec(
                    storage_uri=self.config.model_uri,
                    resources=self.config.resources,
                )
            }

        isvc = V1beta1InferenceService(
            api_version=api_version,
            kind=constants.KSERVE_KIND,
            metadata=k8s_client.V1ObjectMeta(
                name=name,
                namespace=namespace,
                labels=self._get_kubernetes_labels(),
                annotations=self.config.get_kubernetes_annotations(),
            ),
            spec=V1beta1InferenceServiceSpec(
                predictor=V1beta1PredictorSpec(**predictor_kwargs)
            ),
        )

        # TODO[HIGH]: better error handling when provisioning KServe instances
        try:
            client.get(name=name, namespace=namespace)
            # update the existing deployment
            client.replace(name, isvc, namespace=namespace)
        except RuntimeError:
            client.create(isvc)

    def deprovision(self, force: bool = False) -> None:
        """Deprovisions all resources used by the service.

        Args:
            force: if True, the service will be deprovisioned even if it is
                still in use.

        Raises:
            ValueError: if the service is still in use and force is False.
        """
        client = self._get_client()
        namespace = self._get_namespace()
        name = self.crd_name

        # TODO[HIGH]: catch errors if deleting a KServe instance that is no
        #   longer available
        try:
            client.delete(name=name, namespace=namespace)
        except RuntimeError:
            raise ValueError(
                f"Could not delete KServe instance '{name}' from namespace: '{namespace}'."
            )

    def _get_deployment_logs(
        self,
        name: str,
        follow: bool = False,
        tail: Optional[int] = None,
    ) -> Generator[str, bool, None]:
        """Get the logs of a KServe deployment resource.

        Args:
            name: the name of the KServe deployment to get logs for.
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that can be accessed to get the service logs.

        Raises:
            Exception: if an unknown error occurs while fetching the logs.

        Yields:
            The logs of the given deployment.
        """
        client = self._get_client()
        namespace = self._get_namespace()

        logger.debug(f"Retrieving logs for InferenceService resource: {name}")
        try:
            response = client.core_api.list_namespaced_pod(
                namespace=namespace,
                label_selector=f"zenml.service_uuid={self.uuid}",
            )
            logger.debug("Kubernetes API response: %s", response)
            pods = response.items
            if not pods:
                raise Exception(
                    f"The KServe deployment {name} is not currently "
                    f"running: no Kubernetes pods associated with it were found"
                )
            pod = pods[0]
            pod_name = pod.metadata.name

            containers = [c.name for c in pod.spec.containers]
            init_containers = [c.name for c in pod.spec.init_containers]
            container_statuses = {
                c.name: c.started or c.restart_count
                for c in pod.status.container_statuses
            }

            container = "default"
            if container not in containers:
                container = containers[0]

            if not container_statuses[container]:
                container = init_containers[0]

            logger.info(
                f"Retrieving logs for pod: `{pod_name}` and container "
                f"`{container}` in namespace `{namespace}`"
            )
            response = client.core_api.read_namespaced_pod_log(
                name=pod_name,
                namespace=namespace,
                container=container,
                follow=follow,
                tail_lines=tail,
                _preload_content=False,
            )
        except k8s_client.rest.ApiException as e:
            logger.error(
                "Exception when fetching logs for InferenceService resource "
                "%s: %s",
                name,
                str(e),
            )
            raise Exception(
                f"Unexpected exception when fetching logs for InferenceService "
                f"resource: {name}"
            ) from e

        try:
            while True:
                line = response.readline().decode("utf-8").rstrip("\n")
                if not line:
                    return
                stop = yield line
                if stop:
                    return
        finally:
            response.release_conn()

    def get_logs(
        self, follow: bool = False, tail: Optional[int] = None
    ) -> Generator[str, bool, None]:
        """Retrieve the logs from the remote KServe inference service instance.

        Args:
            follow: if True, the logs will be streamed as they are written.
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that can be accessed to get the service logs.
        """
        return self._get_deployment_logs(
            self.crd_name,
            follow=follow,
            tail=tail,
        )

    @property
    def prediction_url(self) -> Optional[str]:
        """The prediction URI exposed by the prediction service.

        Returns:
            The prediction URI exposed by the prediction service, or None if
            the service is not yet ready.
        """
        if not self.is_running:
            return None

        model_deployer = self._get_model_deployer()
        return os.path.join(
            model_deployer.config.base_url,
            "v1/models",
            f"{self.crd_name}:predict",
        )

    @property
    def prediction_hostname(self) -> Optional[str]:
        """The prediction hostname exposed by the prediction service.

        Returns:
            The prediction hostname exposed by the prediction service status
            that will be used in the headers of the prediction request.
        """
        if not self.is_running:
            return None

        namespace = self._get_namespace()

        model_deployer = self._get_model_deployer()
        custom_domain = model_deployer.config.custom_domain or "example.com"
        return f"{self.crd_name}.{namespace}.{custom_domain}"

    def predict(self, request: str) -> Any:
        """Make a prediction using the service.

        Args:
            request: a NumPy array representing the request

        Returns:
            A NumPy array represents the prediction returned by the service.

        Raises:
            Exception: if the service is not yet ready.
            ValueError: if the prediction_url is not set.
        """
        if not self.is_running:
            raise Exception(
                "KServe prediction service is not running. "
                "Please start the service before making predictions."
            )

        if self.prediction_url is None:
            raise ValueError("`self.prediction_url` is not set, cannot post.")
        if self.prediction_hostname is None:
            raise ValueError(
                "`self.prediction_hostname` is not set, cannot post."
            )
        headers = {"Host": self.prediction_hostname}
        if isinstance(request, str):
            request = json.loads(request)
        else:
            raise ValueError("Request must be a json string.")
        response = requests.post(
            self.prediction_url,
            headers=headers,
            json={"instances": request},
        )
        response.raise_for_status()
        return response.json()
crd_name: str property readonly

Get the name of the KServe inference service CRD that uniquely corresponds to this service instance.

Returns:

Type Description
str

The name of the KServe inference service CRD.

prediction_hostname: Optional[str] property readonly

The prediction hostname exposed by the prediction service.

Returns:

Type Description
Optional[str]

The prediction hostname exposed by the prediction service status that will be used in the headers of the prediction request.

prediction_url: Optional[str] property readonly

The prediction URI exposed by the prediction service.

Returns:

Type Description
Optional[str]

The prediction URI exposed by the prediction service, or None if the service is not yet ready.

check_status(self)

Check the state of the KServe inference service.

This method Checks the current operational state of the external KServe inference service and translate it into a ServiceState value and a printable message.

This method should be overridden by subclasses that implement concrete service tracking functionality.

Returns:

Type Description
Tuple[zenml.services.service_status.ServiceState, str]

The operational state of the external service and a message providing additional information about that state (e.g. a description of the error if one is encountered while checking the service status).

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def check_status(self) -> Tuple[ServiceState, str]:
    """Check the state of the KServe inference service.

    This method Checks the current operational state of the external KServe
    inference service and translate it into a `ServiceState` value and a printable message.

    This method should be overridden by subclasses that implement concrete service tracking functionality.

    Returns:
        The operational state of the external service and a message
        providing additional information about that state (e.g. a
        description of the error if one is encountered while checking the
        service status).
    """
    client = self._get_client()
    namespace = self._get_namespace()

    name = self.crd_name
    try:
        deployment = client.get(name=name, namespace=namespace)
    except RuntimeError:
        return (ServiceState.INACTIVE, "")

    # TODO[MEDIUM]: Implement better operational status checking that also
    #   cover errors
    if "status" not in deployment:
        return (ServiceState.INACTIVE, "No operational status available")
    status = "Unknown"
    for condition in deployment["status"].get("conditions", {}):
        if condition.get("type", "") == "PredictorReady":
            status = condition.get("status", "Unknown")
            if status.lower() == "true":
                return (
                    ServiceState.ACTIVE,
                    f"Inference service '{name}' is available",
                )

            elif status.lower() == "false":
                return (
                    ServiceState.PENDING_STARTUP,
                    f"Inference service '{name}' is not available: {condition.get('message', 'Unknown')}",
                )
    return (
        ServiceState.PENDING_STARTUP,
        f"Inference service '{name}' still starting up",
    )
create_from_deployment(deployment) classmethod

Recreate the configuration of a KServe Service from a deployed instance.

Parameters:

Name Type Description Default
deployment V1beta1InferenceService

the KServe deployment resource.

required

Returns:

Type Description
KServeDeploymentService

The KServe service configuration corresponding to the given KServe deployment resource.

Exceptions:

Type Description
ValueError

if the given deployment resource does not contain the expected annotations or it contains an invalid or incompatible KServe service configuration.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
@classmethod
def create_from_deployment(
    cls, deployment: V1beta1InferenceService
) -> "KServeDeploymentService":
    """Recreate the configuration of a KServe Service from a deployed instance.

    Args:
        deployment: the KServe deployment resource.

    Returns:
        The KServe service configuration corresponding to the given
        KServe deployment resource.

    Raises:
        ValueError: if the given deployment resource does not contain
            the expected annotations or it contains an invalid or
            incompatible KServe service configuration.
    """
    config = KServeDeploymentConfig.create_from_deployment(deployment)
    uuid = deployment.metadata.get("labels").get("zenml.service_uuid")
    if not uuid:
        raise ValueError(
            f"The given deployment resource does not contain a valid "
            f"'zenml.service_uuid' label: {deployment}"
        )
    service = cls(uuid=UUID(uuid), config=config)
    service.update_status()
    return service
deprovision(self, force=False)

Deprovisions all resources used by the service.

Parameters:

Name Type Description Default
force bool

if True, the service will be deprovisioned even if it is still in use.

False

Exceptions:

Type Description
ValueError

if the service is still in use and force is False.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def deprovision(self, force: bool = False) -> None:
    """Deprovisions all resources used by the service.

    Args:
        force: if True, the service will be deprovisioned even if it is
            still in use.

    Raises:
        ValueError: if the service is still in use and force is False.
    """
    client = self._get_client()
    namespace = self._get_namespace()
    name = self.crd_name

    # TODO[HIGH]: catch errors if deleting a KServe instance that is no
    #   longer available
    try:
        client.delete(name=name, namespace=namespace)
    except RuntimeError:
        raise ValueError(
            f"Could not delete KServe instance '{name}' from namespace: '{namespace}'."
        )
get_logs(self, follow=False, tail=None)

Retrieve the logs from the remote KServe inference service instance.

Parameters:

Name Type Description Default
follow bool

if True, the logs will be streamed as they are written.

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
Generator[str, bool, NoneType]

A generator that can be accessed to get the service logs.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def get_logs(
    self, follow: bool = False, tail: Optional[int] = None
) -> Generator[str, bool, None]:
    """Retrieve the logs from the remote KServe inference service instance.

    Args:
        follow: if True, the logs will be streamed as they are written.
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.
    """
    return self._get_deployment_logs(
        self.crd_name,
        follow=follow,
        tail=tail,
    )
predict(self, request)

Make a prediction using the service.

Parameters:

Name Type Description Default
request str

a NumPy array representing the request

required

Returns:

Type Description
Any

A NumPy array represents the prediction returned by the service.

Exceptions:

Type Description
Exception

if the service is not yet ready.

ValueError

if the prediction_url is not set.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def predict(self, request: str) -> Any:
    """Make a prediction using the service.

    Args:
        request: a NumPy array representing the request

    Returns:
        A NumPy array represents the prediction returned by the service.

    Raises:
        Exception: if the service is not yet ready.
        ValueError: if the prediction_url is not set.
    """
    if not self.is_running:
        raise Exception(
            "KServe prediction service is not running. "
            "Please start the service before making predictions."
        )

    if self.prediction_url is None:
        raise ValueError("`self.prediction_url` is not set, cannot post.")
    if self.prediction_hostname is None:
        raise ValueError(
            "`self.prediction_hostname` is not set, cannot post."
        )
    headers = {"Host": self.prediction_hostname}
    if isinstance(request, str):
        request = json.loads(request)
    else:
        raise ValueError("Request must be a json string.")
    response = requests.post(
        self.prediction_url,
        headers=headers,
        json={"instances": request},
    )
    response.raise_for_status()
    return response.json()
provision(self)

Provision or update remote KServe deployment instance.

This should then match the current configuration.

Source code in zenml/integrations/kserve/services/kserve_deployment.py
def provision(self) -> None:
    """Provision or update remote KServe deployment instance.

    This should then match the current configuration.
    """
    client = self._get_client()
    namespace = self._get_namespace()

    api_version = constants.KSERVE_GROUP + "/" + "v1beta1"
    name = self.crd_name

    # All supported model specs seem to have the same fields
    # so we can use any one of them (see https://kserve.github.io/website/0.8/reference/api/#serving.kserve.io/v1beta1.PredictorExtensionSpec)
    if self.config.container is not None:
        predictor_kwargs = {
            "containers": [
                k8s_client.V1Container(
                    name=self.config.container.get("name"),
                    image=self.config.container.get("image"),
                    command=self.config.container.get("command"),
                    args=self.config.container.get("args"),
                    env=[
                        k8s_client.V1EnvVar(
                            name="STORAGE_URI",
                            value=self.config.container.get("storage_uri"),
                        )
                    ],
                )
            ]
        }
    else:
        predictor_kwargs = {
            self.config.predictor: V1beta1PredictorExtensionSpec(
                storage_uri=self.config.model_uri,
                resources=self.config.resources,
            )
        }

    isvc = V1beta1InferenceService(
        api_version=api_version,
        kind=constants.KSERVE_KIND,
        metadata=k8s_client.V1ObjectMeta(
            name=name,
            namespace=namespace,
            labels=self._get_kubernetes_labels(),
            annotations=self.config.get_kubernetes_annotations(),
        ),
        spec=V1beta1InferenceServiceSpec(
            predictor=V1beta1PredictorSpec(**predictor_kwargs)
        ),
    )

    # TODO[HIGH]: better error handling when provisioning KServe instances
    try:
        client.get(name=name, namespace=namespace)
        # update the existing deployment
        client.replace(name, isvc, namespace=namespace)
    except RuntimeError:
        client.create(isvc)

steps special

Initialization for KServe steps.

kserve_deployer

Implementation of the KServe Deployer step.

CustomDeployParameters (BaseModel) pydantic-model

Custom model deployer step extra parameters.

Attributes:

Name Type Description
predict_function str

Path to Python file containing predict function.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class CustomDeployParameters(BaseModel):
    """Custom model deployer step extra parameters.

    Attributes:
        predict_function: Path to Python file containing predict function.
    """

    predict_function: str

    @validator("predict_function")
    def predict_function_validate(cls, predict_func_path: str) -> str:
        """Validate predict function.

        Args:
            predict_func_path: predict function path

        Returns:
            predict function path

        Raises:
            ValueError: if predict function path is not valid
            TypeError: if predict function path is not a callable function
        """
        try:
            predict_function = import_class_by_path(predict_func_path)
        except AttributeError:
            raise ValueError("Predict function can't be found.")
        if not callable(predict_function):
            raise TypeError("Predict function must be callable.")
        return predict_func_path
predict_function_validate(predict_func_path) classmethod

Validate predict function.

Parameters:

Name Type Description Default
predict_func_path str

predict function path

required

Returns:

Type Description
str

predict function path

Exceptions:

Type Description
ValueError

if predict function path is not valid

TypeError

if predict function path is not a callable function

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("predict_function")
def predict_function_validate(cls, predict_func_path: str) -> str:
    """Validate predict function.

    Args:
        predict_func_path: predict function path

    Returns:
        predict function path

    Raises:
        ValueError: if predict function path is not valid
        TypeError: if predict function path is not a callable function
    """
    try:
        predict_function = import_class_by_path(predict_func_path)
    except AttributeError:
        raise ValueError("Predict function can't be found.")
    if not callable(predict_function):
        raise TypeError("Predict function must be callable.")
    return predict_func_path
KServeDeployerStepParameters (BaseParameters) pydantic-model

KServe model deployer step parameters.

Attributes:

Name Type Description
service_config KServeDeploymentConfig

KServe deployment service configuration.

torch_serve_params

TorchServe set of parameters to deploy model.

timeout int

Timeout for model deployment.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class KServeDeployerStepParameters(BaseParameters):
    """KServe model deployer step parameters.

    Attributes:
        service_config: KServe deployment service configuration.
        torch_serve_params: TorchServe set of parameters to deploy model.
        timeout: Timeout for model deployment.
    """

    service_config: KServeDeploymentConfig
    custom_deploy_parameters: Optional[CustomDeployParameters] = None
    torch_serve_parameters: Optional[TorchServeParameters] = None
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT
TorchServeParameters (BaseModel) pydantic-model

KServe PyTorch model deployer step configuration.

Attributes:

Name Type Description
model_class str

Path to Python file containing model architecture.

handler str

TorchServe's handler file to handle custom TorchServe inference logic.

extra_files Optional[List[str]]

Comma separated path to extra dependency files.

model_version Optional[str]

Model version.

requirements_file Optional[str]

Path to requirements file.

torch_config Optional[str]

TorchServe configuration file path.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class TorchServeParameters(BaseModel):
    """KServe PyTorch model deployer step configuration.

    Attributes:
        model_class: Path to Python file containing model architecture.
        handler: TorchServe's handler file to handle custom TorchServe inference
            logic.
        extra_files: Comma separated path to extra dependency files.
        model_version: Model version.
        requirements_file: Path to requirements file.
        torch_config: TorchServe configuration file path.
    """

    model_class: str
    handler: str
    extra_files: Optional[List[str]] = None
    requirements_file: Optional[str] = None
    model_version: Optional[str] = "1.0"
    torch_config: Optional[str] = None

    @validator("model_class")
    def model_class_validate(cls, v: str) -> str:
        """Validate model class file path.

        Args:
            v: model class file path

        Returns:
            model class file path

        Raises:
            ValueError: if model class file path is not valid
        """
        if not v:
            raise ValueError("Model class file path is required.")
        if not is_inside_repository(v):
            raise ValueError(
                "Model class file path must be inside the repository."
            )
        return v

    @validator("handler")
    def handler_validate(cls, v: str) -> str:
        """Validate handler.

        Args:
            v: handler file path

        Returns:
            handler file path

        Raises:
            ValueError: if handler file path is not valid
        """
        if v:
            if v in TORCH_HANDLERS:
                return v
            elif is_inside_repository(v):
                return v
            else:
                raise ValueError(
                    "Handler must be one of the TorchServe handlers",
                    "or a file that exists inside the repository.",
                )
        else:
            raise ValueError("Handler is required.")

    @validator("extra_files")
    def extra_files_validate(
        cls, v: Optional[List[str]]
    ) -> Optional[List[str]]:
        """Validate extra files.

        Args:
            v: extra files path

        Returns:
            extra files path

        Raises:
            ValueError: if the extra files path is not valid
        """
        extra_files = []
        if v is not None:
            for file_path in v:
                if is_inside_repository(file_path):
                    extra_files.append(file_path)
                else:
                    raise ValueError(
                        "Extra file path must be inside the repository."
                    )
            return extra_files
        return v

    @validator("torch_config")
    def torch_config_validate(cls, v: Optional[str]) -> Optional[str]:
        """Validate torch config file.

        Args:
            v: torch config file path

        Returns:
            torch config file path

        Raises:
            ValueError: if torch config file path is not valid.
        """
        if v:
            if is_inside_repository(v):
                return v
            else:
                raise ValueError(
                    "Torch config file path must be inside the repository."
                )
        return v
extra_files_validate(v) classmethod

Validate extra files.

Parameters:

Name Type Description Default
v Optional[List[str]]

extra files path

required

Returns:

Type Description
Optional[List[str]]

extra files path

Exceptions:

Type Description
ValueError

if the extra files path is not valid

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("extra_files")
def extra_files_validate(
    cls, v: Optional[List[str]]
) -> Optional[List[str]]:
    """Validate extra files.

    Args:
        v: extra files path

    Returns:
        extra files path

    Raises:
        ValueError: if the extra files path is not valid
    """
    extra_files = []
    if v is not None:
        for file_path in v:
            if is_inside_repository(file_path):
                extra_files.append(file_path)
            else:
                raise ValueError(
                    "Extra file path must be inside the repository."
                )
        return extra_files
    return v
handler_validate(v) classmethod

Validate handler.

Parameters:

Name Type Description Default
v str

handler file path

required

Returns:

Type Description
str

handler file path

Exceptions:

Type Description
ValueError

if handler file path is not valid

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("handler")
def handler_validate(cls, v: str) -> str:
    """Validate handler.

    Args:
        v: handler file path

    Returns:
        handler file path

    Raises:
        ValueError: if handler file path is not valid
    """
    if v:
        if v in TORCH_HANDLERS:
            return v
        elif is_inside_repository(v):
            return v
        else:
            raise ValueError(
                "Handler must be one of the TorchServe handlers",
                "or a file that exists inside the repository.",
            )
    else:
        raise ValueError("Handler is required.")
model_class_validate(v) classmethod

Validate model class file path.

Parameters:

Name Type Description Default
v str

model class file path

required

Returns:

Type Description
str

model class file path

Exceptions:

Type Description
ValueError

if model class file path is not valid

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("model_class")
def model_class_validate(cls, v: str) -> str:
    """Validate model class file path.

    Args:
        v: model class file path

    Returns:
        model class file path

    Raises:
        ValueError: if model class file path is not valid
    """
    if not v:
        raise ValueError("Model class file path is required.")
    if not is_inside_repository(v):
        raise ValueError(
            "Model class file path must be inside the repository."
        )
    return v
torch_config_validate(v) classmethod

Validate torch config file.

Parameters:

Name Type Description Default
v Optional[str]

torch config file path

required

Returns:

Type Description
Optional[str]

torch config file path

Exceptions:

Type Description
ValueError

if torch config file path is not valid.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@validator("torch_config")
def torch_config_validate(cls, v: Optional[str]) -> Optional[str]:
    """Validate torch config file.

    Args:
        v: torch config file path

    Returns:
        torch config file path

    Raises:
        ValueError: if torch config file path is not valid.
    """
    if v:
        if is_inside_repository(v):
            return v
        else:
            raise ValueError(
                "Torch config file path must be inside the repository."
            )
    return v
kserve_custom_model_deployer_step (BaseStep)

KServe custom model deployer pipeline step.

This step can be used in a pipeline to implement the process required to deploy a custom model with KServe.

Parameters:

Name Type Description Default
deploy_decision

whether to deploy the model or not

required
params

parameters for the deployer step

required
model

the model artifact to deploy

required
context

the step context

required

Exceptions:

Type Description
ValueError

if the custom deployer parameters is not defined

DoesNotExistException

if no active stack is found

Returns:

Type Description

KServe deployment service

PARAMETERS_CLASS (BaseParameters) pydantic-model

KServe model deployer step parameters.

Attributes:

Name Type Description
service_config KServeDeploymentConfig

KServe deployment service configuration.

torch_serve_params

TorchServe set of parameters to deploy model.

timeout int

Timeout for model deployment.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class KServeDeployerStepParameters(BaseParameters):
    """KServe model deployer step parameters.

    Attributes:
        service_config: KServe deployment service configuration.
        torch_serve_params: TorchServe set of parameters to deploy model.
        timeout: Timeout for model deployment.
    """

    service_config: KServeDeploymentConfig
    custom_deploy_parameters: Optional[CustomDeployParameters] = None
    torch_serve_parameters: Optional[TorchServeParameters] = None
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT
entrypoint(deploy_decision, params, context, model) staticmethod

KServe custom model deployer pipeline step.

This step can be used in a pipeline to implement the process required to deploy a custom model with KServe.

Parameters:

Name Type Description Default
deploy_decision bool

whether to deploy the model or not

required
params KServeDeployerStepParameters

parameters for the deployer step

required
model ModelArtifact

the model artifact to deploy

required
context StepContext

the step context

required

Exceptions:

Type Description
ValueError

if the custom deployer parameters is not defined

DoesNotExistException

if no active stack is found

Returns:

Type Description
KServeDeploymentService

KServe deployment service

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@step(enable_cache=False, extra={KSERVE_CUSTOM_DEPLOYMENT: True})
def kserve_custom_model_deployer_step(
    deploy_decision: bool,
    params: KServeDeployerStepParameters,
    context: StepContext,
    model: ModelArtifact,
) -> KServeDeploymentService:
    """KServe custom model deployer pipeline step.

    This step can be used in a pipeline to implement the
    process required to deploy a custom model with KServe.

    Args:
        deploy_decision: whether to deploy the model or not
        params: parameters for the deployer step
        model: the model artifact to deploy
        context: the step context

    Raises:
        ValueError: if the custom deployer parameters is not defined
        DoesNotExistException: if no active stack is found


    Returns:
        KServe deployment service
    """
    # verify that a custom deployer is defined
    if not params.custom_deploy_parameters:
        raise ValueError(
            "Custom deploy parameter which contains the path of the",
            "custom predict function is required for custom model deployment.",
        )

    # get the active model deployer
    model_deployer = cast(
        KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name, run id
    step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
    pipeline_name = step_env.pipeline_name
    pipeline_run_id = step_env.pipeline_run_id
    step_name = step_env.step_name

    # update the step configuration with the real pipeline runtime information
    params.service_config.pipeline_name = pipeline_name
    params.service_config.pipeline_run_id = pipeline_run_id
    params.service_config.pipeline_step_name = step_name

    # fetch existing services with same pipeline name, step name and
    # model name
    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=step_name,
        model_name=params.service_config.model_name,
    )

    # even when the deploy decision is negative if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing the last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{params.service_config.model_name}'..."
        )
        service = cast(KServeDeploymentService, existing_services[0])
        # even when the deploy decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if not service.is_running:
            service.start(timeout=params.timeout)
        return service

    # entrypoint for starting KServe server deployment for custom model
    entrypoint_command = [
        "python",
        "-m",
        "zenml.integrations.kserve.custom_deployer.zenml_custom_model",
        "--model_name",
        params.service_config.model_name,
        "--predict_func",
        params.custom_deploy_parameters.predict_function,
    ]

    # verify if there is an active stack before starting the service
    if not context.stack:
        raise DoesNotExistException(
            "No active stack is available. "
            "Please make sure that you have registered and set a stack."
        )
    context.stack

    docker_image = step_env.step_run_info.pipeline.extra[
        KSERVE_DOCKER_IMAGE_KEY
    ]

    # copy the model files to a new specific directory for the deployment
    served_model_uri = os.path.join(context.get_output_artifact_uri(), "kserve")
    fileio.makedirs(served_model_uri)
    io_utils.copy_dir(model.uri, served_model_uri)

    # Get the model artifact to extract information about the model
    # and how it can be loaded again later in the deployment environment.
    artifact = Client().zen_store.list_artifacts(artifact_uri=model.uri)
    if not artifact:
        raise DoesNotExistException(f"No artifact found at {model.uri}.")

    # save the model artifact metadata to the YAML file and copy it to the
    # deployment directory
    model_metadata_file = save_model_metadata(artifact[0])
    fileio.copy(
        model_metadata_file,
        os.path.join(served_model_uri, MODEL_METADATA_YAML_FILE_NAME),
    )

    # prepare the service configuration for the deployment
    service_config = params.service_config.copy()
    service_config.model_uri = served_model_uri

    # Prepare container config for custom model deployment
    service_config.container = {
        "name": service_config.model_name,
        "image": docker_image,
        "command": entrypoint_command,
        "storage_uri": service_config.model_uri,
    }

    # deploy the service
    service = cast(
        KServeDeploymentService,
        model_deployer.deploy_model(
            service_config, replace=True, timeout=params.timeout
        ),
    )

    logger.info(
        f"KServe deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
        f"    With the hostname: {service.prediction_hostname}."
    )

    return service
kserve_model_deployer_step (BaseStep)

KServe model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for an ML model with KServe.

Parameters:

Name Type Description Default
deploy_decision

whether to deploy the model or not

required
params

parameters for the deployer step

required
model

the model artifact to deploy

required
context

the step context

required

Returns:

Type Description

KServe deployment service

PARAMETERS_CLASS (BaseParameters) pydantic-model

KServe model deployer step parameters.

Attributes:

Name Type Description
service_config KServeDeploymentConfig

KServe deployment service configuration.

torch_serve_params

TorchServe set of parameters to deploy model.

timeout int

Timeout for model deployment.

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
class KServeDeployerStepParameters(BaseParameters):
    """KServe model deployer step parameters.

    Attributes:
        service_config: KServe deployment service configuration.
        torch_serve_params: TorchServe set of parameters to deploy model.
        timeout: Timeout for model deployment.
    """

    service_config: KServeDeploymentConfig
    custom_deploy_parameters: Optional[CustomDeployParameters] = None
    torch_serve_parameters: Optional[TorchServeParameters] = None
    timeout: int = DEFAULT_KSERVE_DEPLOYMENT_START_STOP_TIMEOUT
entrypoint(deploy_decision, params, context, model) staticmethod

KServe model deployer pipeline step.

This step can be used in a pipeline to implement continuous deployment for an ML model with KServe.

Parameters:

Name Type Description Default
deploy_decision bool

whether to deploy the model or not

required
params KServeDeployerStepParameters

parameters for the deployer step

required
model ModelArtifact

the model artifact to deploy

required
context StepContext

the step context

required

Returns:

Type Description
KServeDeploymentService

KServe deployment service

Source code in zenml/integrations/kserve/steps/kserve_deployer.py
@step(enable_cache=False)
def kserve_model_deployer_step(
    deploy_decision: bool,
    params: KServeDeployerStepParameters,
    context: StepContext,
    model: ModelArtifact,
) -> KServeDeploymentService:
    """KServe model deployer pipeline step.

    This step can be used in a pipeline to implement continuous
    deployment for an ML model with KServe.

    Args:
        deploy_decision: whether to deploy the model or not
        params: parameters for the deployer step
        model: the model artifact to deploy
        context: the step context

    Returns:
        KServe deployment service
    """
    model_deployer = cast(
        KServeModelDeployer, KServeModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name and run id
    step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
    pipeline_name = step_env.pipeline_name
    pipeline_run_id = step_env.pipeline_run_id
    step_name = step_env.step_name

    # update the step configuration with the real pipeline runtime information
    params.service_config.pipeline_name = pipeline_name
    params.service_config.pipeline_run_id = pipeline_run_id
    params.service_config.pipeline_step_name = step_name

    # fetch existing services with same pipeline name, step name and
    # model name
    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=step_name,
        model_name=params.service_config.model_name,
    )

    # even when the deploy decision is negative if an existing model server
    # is not running for this pipeline/step, we still have to serve the
    # current model, to ensure that a model server is available at all times
    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing the last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{params.service_config.model_name}'..."
        )
        service = cast(KServeDeploymentService, existing_services[0])
        # even when the deploy decision is negative, we still need to start
        # the previous model server if it is no longer running, to ensure that
        # a model server is available at all times
        if not service.is_running:
            service.start(timeout=params.timeout)
        return service

    # invoke the KServe model deployer to create a new service
    # or update an existing one that was previously deployed for the same
    # model
    if params.service_config.predictor == "pytorch":
        # import the prepare function from the step utils
        from zenml.integrations.kserve.steps.kserve_step_utils import (
            prepare_torch_service_config,
        )

        # prepare the service config
        service_config = prepare_torch_service_config(
            model_uri=model.uri,
            output_artifact_uri=context.get_output_artifact_uri(),
            params=params,
        )
    else:
        # import the prepare function from the step utils
        from zenml.integrations.kserve.steps.kserve_step_utils import (
            prepare_service_config,
        )

        # prepare the service config
        service_config = prepare_service_config(
            model_uri=model.uri,
            output_artifact_uri=context.get_output_artifact_uri(),
            params=params,
        )
    service = cast(
        KServeDeploymentService,
        model_deployer.deploy_model(
            service_config, replace=True, timeout=params.timeout
        ),
    )

    logger.info(
        f"KServe deployment service started and reachable at:\n"
        f"    {service.prediction_url}\n"
        f"    With the hostname: {service.prediction_hostname}."
    )

    return service

kserve_step_utils

This module contains the utility functions used by the KServe deployer step.

TorchModelArchiver (BaseModel) pydantic-model

Model Archiver for PyTorch models.

Attributes:

Name Type Description
model_name str

Model name.

model_version

Model version.

serialized_file str

Serialized model file.

handler str

TorchServe's handler file to handle custom TorchServe inference logic.

extra_files Optional[List[str]]

Comma separated path to extra dependency files.

requirements_file Optional[str]

Path to requirements file.

export_path str

Path to export model.

runtime Optional[str]

Runtime of the model.

force Optional[bool]

Force export of the model.

archive_format Optional[str]

Archive format.

Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
class TorchModelArchiver(BaseModel):
    """Model Archiver for PyTorch models.

    Attributes:
        model_name: Model name.
        model_version: Model version.
        serialized_file: Serialized model file.
        handler: TorchServe's handler file to handle custom TorchServe inference logic.
        extra_files: Comma separated path to extra dependency files.
        requirements_file: Path to requirements file.
        export_path: Path to export model.
        runtime: Runtime of the model.
        force: Force export of the model.
        archive_format: Archive format.
    """

    model_name: str
    serialized_file: str
    model_file: str
    handler: str
    export_path: str
    extra_files: Optional[List[str]] = None
    version: Optional[str] = None
    requirements_file: Optional[str] = None
    runtime: Optional[str] = "python"
    force: Optional[bool] = None
    archive_format: Optional[str] = "default"
generate_model_deployer_config(model_name, directory)

Generate a model deployer config.

Parameters:

Name Type Description Default
model_name str

the name of the model

required
directory str

the directory where the model is stored

required

Returns:

Type Description
str

None

Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def generate_model_deployer_config(
    model_name: str,
    directory: str,
) -> str:
    """Generate a model deployer config.

    Args:
        model_name: the name of the model
        directory: the directory where the model is stored

    Returns:
        None
    """
    config_lines = [
        "inference_address=http://0.0.0.0:8085",
        "management_address=http://0.0.0.0:8085",
        "metrics_address=http://0.0.0.0:8082",
        "grpc_inference_port=7070",
        "grpc_management_port=7071",
        "enable_metrics_api=true",
        "metrics_format=prometheus",
        "number_of_netty_threads=4",
        "job_queue_size=10",
        "enable_envvars_config=true",
        "install_py_dep_per_model=true",
        "model_store=/mnt/models/model-store",
    ]

    with tempfile.NamedTemporaryFile(
        suffix=".properties", mode="w+", dir=directory, delete=False
    ) as f:
        for line in config_lines:
            f.write(line + "\n")
        f.write(
            f'model_snapshot={{"name":"startup.cfg","modelCount":1,"models":{{"{model_name}":{{"1.0":{{"defaultVersion":true,"marName":"{model_name}.mar","minWorkers":1,"maxWorkers":5,"batchSize":1,"maxBatchDelay":10,"responseTimeout":120}}}}}}}}'
        )
    f.close()
    return f.name
prepare_service_config(model_uri, output_artifact_uri, params)

Prepare the model files for model serving.

This function ensures that the model files are in the correct format and file structure required by the KServe server implementation used for model serving.

Parameters:

Name Type Description Default
model_uri str

the URI of the model artifact being served

required
output_artifact_uri str

the URI of the output artifact

required
params KServeDeployerStepParameters

the KServe deployer step parameters

required

Returns:

Type Description
KServeDeploymentConfig

The URL to the model is ready for serving.

Exceptions:

Type Description
RuntimeError

if the model files cannot be prepared.

Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def prepare_service_config(
    model_uri: str,
    output_artifact_uri: str,
    params: KServeDeployerStepParameters,
) -> KServeDeploymentConfig:
    """Prepare the model files for model serving.

    This function ensures that the model files are in the correct format
    and file structure required by the KServe server implementation
    used for model serving.

    Args:
        model_uri: the URI of the model artifact being served
        output_artifact_uri: the URI of the output artifact
        params: the KServe deployer step parameters

    Returns:
        The URL to the model is ready for serving.

    Raises:
        RuntimeError: if the model files cannot be prepared.
    """
    served_model_uri = os.path.join(output_artifact_uri, "kserve")
    fileio.makedirs(served_model_uri)

    # TODO [ENG-773]: determine how to formalize how models are organized into
    #   folders and sub-folders depending on the model type/format and the
    #   KServe protocol used to serve the model.

    # TODO [ENG-791]: an auto-detect built-in KServe server implementation
    #   from the model artifact type

    # TODO [ENG-792]: validate the model artifact type against the
    #   supported built-in KServe server implementations
    if params.service_config.predictor == "tensorflow":
        # the TensorFlow server expects model artifacts to be
        # stored in numbered subdirectories, each representing a model
        # version
        served_model_uri = os.path.join(
            served_model_uri,
            params.service_config.predictor,
            params.service_config.model_name,
        )
        fileio.makedirs(served_model_uri)
        io_utils.copy_dir(model_uri, os.path.join(served_model_uri, "1"))
    elif params.service_config.predictor == "sklearn":
        # the sklearn server expects model artifacts to be
        # stored in a file called model.joblib
        model_uri = os.path.join(model_uri, "model")
        if not fileio.exists(model_uri):
            raise RuntimeError(
                f"Expected sklearn model artifact was not found at "
                f"{model_uri}"
            )
        served_model_uri = os.path.join(
            served_model_uri,
            params.service_config.predictor,
            params.service_config.model_name,
        )
        fileio.makedirs(served_model_uri)
        fileio.copy(model_uri, os.path.join(served_model_uri, "model.joblib"))
    else:
        # default treatment for all other server implementations is to
        # simply reuse the model from the artifact store path where it
        # is originally stored
        served_model_uri = os.path.join(
            served_model_uri,
            params.service_config.predictor,
            params.service_config.model_name,
        )
        fileio.makedirs(served_model_uri)
        fileio.copy(model_uri, served_model_uri)

    service_config = params.service_config.copy()
    service_config.model_uri = served_model_uri
    return service_config
prepare_torch_service_config(model_uri, output_artifact_uri, params)

Prepare the PyTorch model files for model serving.

This function ensures that the model files are in the correct format and file structure required by the KServe server implementation used for model serving.

Parameters:

Name Type Description Default
model_uri str

the URI of the model artifact being served

required
output_artifact_uri str

the URI of the output artifact

required
params KServeDeployerStepParameters

the KServe deployer step parameters

required

Returns:

Type Description
KServeDeploymentConfig

The URL to the model is ready for serving.

Exceptions:

Type Description
RuntimeError

if the model files cannot be prepared.

Source code in zenml/integrations/kserve/steps/kserve_step_utils.py
def prepare_torch_service_config(
    model_uri: str,
    output_artifact_uri: str,
    params: KServeDeployerStepParameters,
) -> KServeDeploymentConfig:
    """Prepare the PyTorch model files for model serving.

    This function ensures that the model files are in the correct format
    and file structure required by the KServe server implementation
    used for model serving.

    Args:
        model_uri: the URI of the model artifact being served
        output_artifact_uri: the URI of the output artifact
        params: the KServe deployer step parameters

    Returns:
        The URL to the model is ready for serving.

    Raises:
        RuntimeError: if the model files cannot be prepared.
    """
    deployment_folder_uri = os.path.join(output_artifact_uri, "kserve")
    served_model_uri = os.path.join(deployment_folder_uri, "model-store")
    config_propreties_uri = os.path.join(deployment_folder_uri, "config")
    fileio.makedirs(served_model_uri)
    fileio.makedirs(config_propreties_uri)

    if params.torch_serve_parameters is None:
        raise RuntimeError("No torch serve parameters provided")
    else:
        # Create a temporary folder
        temp_dir = tempfile.mkdtemp(prefix="zenml-pytorch-temp-")
        tmp_model_uri = os.path.join(
            str(temp_dir), f"{params.service_config.model_name}.pt"
        )

        # Copy from artifact store to temporary file
        fileio.copy(f"{model_uri}/checkpoint.pt", tmp_model_uri)

        torch_archiver_args = TorchModelArchiver(
            model_name=params.service_config.model_name,
            serialized_file=tmp_model_uri,
            model_file=params.torch_serve_parameters.model_class,
            handler=params.torch_serve_parameters.handler,
            export_path=temp_dir,
            version=params.torch_serve_parameters.model_version,
        )

        manifest = ModelExportUtils.generate_manifest_json(torch_archiver_args)
        package_model(torch_archiver_args, manifest=manifest)

        # Copy from temporary file to artifact store
        archived_model_uri = os.path.join(
            temp_dir, f"{params.service_config.model_name}.mar"
        )
        if not fileio.exists(archived_model_uri):
            raise RuntimeError(
                f"Expected torch archived model artifact was not found at "
                f"{archived_model_uri}"
            )

        # Copy the torch model archive artifact to the model store
        fileio.copy(
            archived_model_uri,
            os.path.join(
                served_model_uri, f"{params.service_config.model_name}.mar"
            ),
        )

        # Get or Generate the config file
        if params.torch_serve_parameters.torch_config:
            # Copy the torch model config to the model store
            fileio.copy(
                params.torch_serve_parameters.torch_config,
                os.path.join(config_propreties_uri, "config.properties"),
            )
        else:
            # Generate the config file
            config_file_uri = generate_model_deployer_config(
                model_name=params.service_config.model_name,
                directory=temp_dir,
            )
            # Copy the torch model config to the model store
            fileio.copy(
                config_file_uri,
                os.path.join(config_propreties_uri, "config.properties"),
            )

    service_config = params.service_config.copy()
    service_config.model_uri = deployment_folder_uri
    return service_config