Skip to content

Bentoml

zenml.integrations.bentoml special

Initialization of the BentoML integration for ZenML.

The BentoML integration allows you to use the BentoML model serving to implement continuous model deployment.

BentoMLIntegration (Integration)

Definition of BentoML integration for ZenML.

Source code in zenml/integrations/bentoml/__init__.py
class BentoMLIntegration(Integration):
    """Definition of BentoML integration for ZenML."""

    NAME = BENTOML
    REQUIREMENTS = [
        "bentoml>=1.0.10",
    ]

    @classmethod
    def activate(cls) -> None:
        """Activate the BentoML integration."""
        from zenml.integrations.bentoml import materializers  # noqa
        from zenml.integrations.bentoml import model_deployers  # noqa
        from zenml.integrations.bentoml import services  # noqa
        from zenml.integrations.bentoml import steps  # noqa

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for KServe.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.bentoml.flavors import (
            BentoMLModelDeployerFlavor,
        )

        return [BentoMLModelDeployerFlavor]

activate() classmethod

Activate the BentoML integration.

Source code in zenml/integrations/bentoml/__init__.py
@classmethod
def activate(cls) -> None:
    """Activate the BentoML integration."""
    from zenml.integrations.bentoml import materializers  # noqa
    from zenml.integrations.bentoml import model_deployers  # noqa
    from zenml.integrations.bentoml import services  # noqa
    from zenml.integrations.bentoml import steps  # noqa

flavors() classmethod

Declare the stack component flavors for KServe.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/bentoml/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for KServe.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.bentoml.flavors import (
        BentoMLModelDeployerFlavor,
    )

    return [BentoMLModelDeployerFlavor]

constants

BentoML constants.

flavors special

BentoML integration flavors.

bentoml_model_deployer_flavor

BentoML model deployer flavor.

BentoMLModelDeployerConfig (BaseModelDeployerConfig) pydantic-model

Configuration for the BentoMLModelDeployer.

Source code in zenml/integrations/bentoml/flavors/bentoml_model_deployer_flavor.py
class BentoMLModelDeployerConfig(BaseModelDeployerConfig):
    """Configuration for the BentoMLModelDeployer."""

    service_path: str = ""
BentoMLModelDeployerFlavor (BaseModelDeployerFlavor)

Flavor for the BentoML model deployer.

Source code in zenml/integrations/bentoml/flavors/bentoml_model_deployer_flavor.py
class BentoMLModelDeployerFlavor(BaseModelDeployerFlavor):
    """Flavor for the BentoML model deployer."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            Name of the flavor.
        """
        return BENTOML_MODEL_DEPLOYER_FLAVOR

    @property
    def config_class(self) -> Type[BentoMLModelDeployerConfig]:
        """Returns `BentoMLModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return BentoMLModelDeployerConfig

    @property
    def implementation_class(self) -> Type["BentoMLModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.bentoml.model_deployers import (
            BentoMLModelDeployer,
        )

        return BentoMLModelDeployer
config_class: Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig] property readonly

Returns BentoMLModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig]

The config class.

implementation_class: Type[BentoMLModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[BentoMLModelDeployer]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

materializers special

Initialization of the BentoML Bento Materializer.

bentoml_bento_materializer

Materializer for BentoML Bento objects.

BentoMaterializer (BaseMaterializer)

Materializer for Bentoml Bento objects.

Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
class BentoMaterializer(BaseMaterializer):
    """Materializer for Bentoml Bento objects."""

    ASSOCIATED_TYPES = (bento.Bento,)
    ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)

    def handle_input(self, data_type: Type[bento.Bento]) -> bento.Bento:
        """Read from artifact store and return a Bento object.

        Args:
            data_type: An bento.Bento type.

        Returns:
            An bento.Bento object.
        """
        super().handle_input(data_type)

        # Create a temporary directory to store the model
        temp_dir = tempfile.TemporaryDirectory()

        # Copy from artifact store to temporary directory
        io_utils.copy_dir(self.artifact.uri, temp_dir.name)

        # Load the Bento from the temporary directory
        imported_bento = Bento.import_from(
            os.path.join(temp_dir.name, DEFAULT_BENTO_FILENAME)
        )

        # Try save the Bento to the local BentoML store
        try:
            _ = bentoml.get(imported_bento.tag)
        except BentoMLException:
            imported_bento.save()
        return imported_bento

    def handle_return(self, bento: bento.Bento) -> None:
        """Write to artifact store.

        Args:
            bento: An bento.Bento object.
        """
        super().handle_return(bento)

        # Create a temporary directory to store the model
        temp_dir = tempfile.TemporaryDirectory(prefix="zenml-temp-")
        temp_bento_path = os.path.join(temp_dir.name, DEFAULT_BENTO_FILENAME)

        # save the image in a temporary directory
        bentoml.export_bento(bento.tag, temp_bento_path)

        # copy the saved image to the artifact store
        io_utils.copy_dir(temp_dir.name, self.artifact.uri)

        # Remove the temporary directory
        fileio.rmtree(temp_dir.name)
handle_input(self, data_type)

Read from artifact store and return a Bento object.

Parameters:

Name Type Description Default
data_type Type[bentoml._internal.bento.bento.Bento]

An bento.Bento type.

required

Returns:

Type Description
Bento

An bento.Bento object.

Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def handle_input(self, data_type: Type[bento.Bento]) -> bento.Bento:
    """Read from artifact store and return a Bento object.

    Args:
        data_type: An bento.Bento type.

    Returns:
        An bento.Bento object.
    """
    super().handle_input(data_type)

    # Create a temporary directory to store the model
    temp_dir = tempfile.TemporaryDirectory()

    # Copy from artifact store to temporary directory
    io_utils.copy_dir(self.artifact.uri, temp_dir.name)

    # Load the Bento from the temporary directory
    imported_bento = Bento.import_from(
        os.path.join(temp_dir.name, DEFAULT_BENTO_FILENAME)
    )

    # Try save the Bento to the local BentoML store
    try:
        _ = bentoml.get(imported_bento.tag)
    except BentoMLException:
        imported_bento.save()
    return imported_bento
handle_return(self, bento)

Write to artifact store.

Parameters:

Name Type Description Default
bento Bento

An bento.Bento object.

required
Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def handle_return(self, bento: bento.Bento) -> None:
    """Write to artifact store.

    Args:
        bento: An bento.Bento object.
    """
    super().handle_return(bento)

    # Create a temporary directory to store the model
    temp_dir = tempfile.TemporaryDirectory(prefix="zenml-temp-")
    temp_bento_path = os.path.join(temp_dir.name, DEFAULT_BENTO_FILENAME)

    # save the image in a temporary directory
    bentoml.export_bento(bento.tag, temp_bento_path)

    # copy the saved image to the artifact store
    io_utils.copy_dir(temp_dir.name, self.artifact.uri)

    # Remove the temporary directory
    fileio.rmtree(temp_dir.name)

model_deployers special

Initialization of the BentoML Model Deployer.

bentoml_model_deployer

Implementation of the BentoML Model Deployer.

BentoMLModelDeployer (BaseModelDeployer)

BentoML model deployer stack component implementation.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
class BentoMLModelDeployer(BaseModelDeployer):
    """BentoML model deployer stack component implementation."""

    NAME: ClassVar[str] = "BentoML"
    FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = BentoMLModelDeployerFlavor

    _service_path: Optional[str] = None

    @property
    def config(self) -> BentoMLModelDeployerConfig:
        """Returns the `BentoMLModelDeployerConfig` config.

        Returns:
            The configuration.
        """
        return cast(BentoMLModelDeployerConfig, self._config)

    @staticmethod
    def get_service_path(id_: UUID) -> str:
        """Get the path where local BentoML service information is stored.

        This includes the deployment service configuration, PID and log files
        are stored.

        Args:
            id_: The ID of the BentoML model deployer.

        Returns:
            The service path.
        """
        service_path = os.path.join(
            GlobalConfiguration().local_stores_path,
            str(id_),
        )
        create_dir_recursive_if_not_exists(service_path)
        return service_path

    @property
    def local_path(self) -> str:
        """Returns the path to the root directory.

        This is where all configurations for BentoML deployment daemon processes
        are stored.

        If the service path is not set in the config by the user, the path is
        set to a local default path according to the component ID.

        Returns:
            The path to the local service root directory.
        """
        if self._service_path is not None:
            return self._service_path

        if self.config.service_path:
            self._service_path = self.config.service_path
        else:
            self._service_path = self.get_service_path(self.id)

        create_dir_recursive_if_not_exists(self._service_path)
        return self._service_path

    @staticmethod
    def get_model_server_info(  # type: ignore[override]
        service_instance: "BentoMLDeploymentService",
    ) -> Dict[str, Optional[str]]:
        """Return implementation specific information on the model server.

        Args:
            service_instance: BentoML deployment service object

        Returns:
            A dictionary containing the model server information.
        """
        predictions_apis_urls = ""
        if service_instance.prediction_apis_urls is not None:
            predictions_apis_urls = ", ".join(
                [
                    api
                    for api in service_instance.prediction_apis_urls
                    if api is not None
                ]
            )

        return {
            "PREDICTION_URL": service_instance.prediction_url,
            "BENTO_TAG": service_instance.config.bento,
            "MODEL_NAME": service_instance.config.model_name,
            "MODEL_URI": service_instance.config.model_uri,
            "BENTO_URI": service_instance.config.bento_uri,
            "SERVICE_PATH": service_instance.status.runtime_path,
            "DAEMON_PID": str(service_instance.status.pid),
            "PREDICITON_APIS_URLS": predictions_apis_urls,
        }

    def deploy_model(
        self,
        config: ServiceConfig,
        replace: bool = False,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    ) -> BaseService:
        """Create a new BentoML deployment service or update an existing one.

        This should serve the supplied model and deployment configuration.

        This method has two modes of operation, depending on the `replace`
        argument value:

          * if `replace` is False, calling this method will create a new BentoML
            deployment server to reflect the model and other configuration
            parameters specified in the supplied BentoML service `config`.

          * if `replace` is True, this method will first attempt to find an
            existing BentoML deployment service that is *equivalent* to the
            supplied configuration parameters. Two or more BentoML deployment
            services are considered equivalent if they have the same
            `pipeline_name`, `pipeline_step_name` and `model_name` configuration
            parameters. To put it differently, two BentoML deployment services
            are equivalent if they serve versions of the same model deployed by
            the same pipeline step. If an equivalent BentoML deployment is found,
            it will be updated in place to reflect the new configuration
            parameters.

        Callers should set `replace` to True if they want a continuous model
        deployment workflow that doesn't spin up a new BentoML deployment
        server for each new model version. If multiple equivalent BentoML
        deployment servers are found, one is selected at random to be updated
        and the others are deleted.

        Args:
            config: the configuration of the model to be deployed with BentoML.
            replace: set this flag to True to find and update an equivalent
                BentoML deployment server with the new model instead of
                creating and starting a new deployment server.
            timeout: the timeout in seconds to wait for the BentoML server
                to be provisioned and successfully started or updated. If set
                to 0, the method will return immediately after the BentoML
                server is provisioned, without waiting for it to fully start.

        Returns:
            The ZenML BentoML deployment service object that can be used to
            interact with the BentoML model http server.
        """
        config = cast(BentoMLDeploymentConfig, config)
        service = None

        # if replace is True, remove all existing services
        if replace is True:
            existing_services = self.find_model_server(
                pipeline_name=config.pipeline_name,
                pipeline_step_name=config.pipeline_step_name,
                model_name=config.model_name,
            )

            for existing_service in existing_services:
                if service is None:
                    # keep the most recently created service
                    service = cast(BentoMLDeploymentService, existing_service)
                try:
                    # delete the older services and don't wait for them to
                    # be deprovisioned
                    self._clean_up_existing_service(
                        existing_service=cast(
                            BentoMLDeploymentService, existing_service
                        ),
                        timeout=timeout,
                        force=True,
                    )
                except RuntimeError:
                    # ignore errors encountered while stopping old services
                    pass
        if service:
            logger.info(
                f"Updating an existing BentoML deployment service: {service}"
            )

            # set the root runtime path with the stack component's UUID
            config.root_runtime_path = self.local_path
            service.stop(timeout=timeout, force=True)
            service.update(config)
            service.start(timeout=timeout)
        else:
            # create a new BentoMLDeploymentService instance
            service = self._create_new_service(timeout, config)
            logger.info(f"Created a new BentoML deployment service: {service}")

        return cast(BaseService, service)

    def _clean_up_existing_service(
        self,
        timeout: int,
        force: bool,
        existing_service: BentoMLDeploymentService,
    ) -> None:
        # stop the older service
        existing_service.stop(timeout=timeout, force=force)

        # delete the old configuration file
        if existing_service.status.runtime_path:
            shutil.rmtree(existing_service.status.runtime_path)

    # the step will receive a config from the user that mentions the number
    # of workers etc.the step implementation will create a new config using
    # all values from the user and add values like pipeline name, model_uri
    def _create_new_service(
        self, timeout: int, config: BentoMLDeploymentConfig
    ) -> BentoMLDeploymentService:
        """Creates a new BentoMLDeploymentService.

        Args:
            timeout: the timeout in seconds to wait for the BentoML http server
                to be provisioned and successfully started or updated.
            config: the configuration of the model to be deployed with BentoML.

        Returns:
            The BentoMLDeploymentService object that can be used to interact
            with the BentoML model server.
        """
        # set the root runtime path with the stack component's UUID
        config.root_runtime_path = self.local_path
        # create a new service for the new model
        service = BentoMLDeploymentService(config)
        service.start(timeout=timeout)

        return service

    def find_model_server(
        self,
        running: bool = False,
        service_uuid: Optional[UUID] = None,
        pipeline_name: Optional[str] = None,
        pipeline_run_id: Optional[str] = None,
        pipeline_step_name: Optional[str] = None,
        model_name: Optional[str] = None,
        model_uri: Optional[str] = None,
        model_type: Optional[str] = None,
    ) -> List[BaseService]:
        """Finds one or more model servers that match the given criteria.

        Args:
            running: If true, only running services will be returned.
            service_uuid: The UUID of the service that was originally used
                to deploy the model.
            pipeline_name: Name of the pipeline that the deployed model was part
                of.
            pipeline_run_id: ID of the pipeline run which the deployed model
                was part of.
            pipeline_step_name: The name of the pipeline model deployment step
                that deployed the model.
            model_name: Name of the deployed model.
            model_uri: URI of the deployed model.
            model_type: Type/format of the deployed model. Not used in this
                BentoML case.

        Returns:
            One or more Service objects representing model servers that match
            the input search criteria.

        Raises:
            TypeError: if any of the input arguments are of an invalid type.
        """
        services = []
        config = BentoMLDeploymentConfig(
            model_name=model_name or "",
            bento="",
            port=BENTOML_DEFAULT_PORT,
            model_uri=model_uri or "",
            working_dir="",
            pipeline_name=pipeline_name or "",
            pipeline_run_id=pipeline_run_id or "",
            pipeline_step_name=pipeline_step_name or "",
        )

        # find all services that match the input criteria
        for root, _, files in os.walk(self.local_path):
            if service_uuid and Path(root).name != str(service_uuid):
                continue
            for file in files:
                if file == SERVICE_DAEMON_CONFIG_FILE_NAME:
                    service_config_path = os.path.join(root, file)
                    logger.debug(
                        "Loading service daemon configuration from %s",
                        service_config_path,
                    )
                    existing_service_config = None
                    with open(service_config_path, "r") as f:
                        existing_service_config = f.read()
                    existing_service = ServiceRegistry().load_service_from_json(
                        existing_service_config
                    )
                    if not isinstance(
                        existing_service, BentoMLDeploymentService
                    ):
                        raise TypeError(
                            f"Expected service type BentoMLDeploymentService but got "
                            f"{type(existing_service)} instead"
                        )
                    existing_service.update_status()
                    if self._matches_search_criteria(existing_service, config):
                        if not running or existing_service.is_running:
                            services.append(cast(BaseService, existing_service))

        return services

    def _matches_search_criteria(
        self,
        existing_service: BentoMLDeploymentService,
        config: BentoMLDeploymentConfig,
    ) -> bool:
        """Returns true if a service matches the input criteria.

        If any of the values in the input criteria are None, they are ignored.
        This allows listing services just by common pipeline names or step
        names, etc.

        Args:
            existing_service: The materialized Service instance derived from
                the config of the older (existing) service
            config: The BentoMlDeploymentConfig object passed to the
                deploy_model function holding parameters of the new service
                to be created.

        Returns:
            True if the service matches the input criteria.
        """
        existing_service_config = existing_service.config

        # check if the existing service matches the input criteria
        if (
            (
                not config.pipeline_name
                or existing_service_config.pipeline_name == config.pipeline_name
            )
            and (
                not config.model_name
                or existing_service_config.model_name == config.model_name
            )
            and (
                not config.pipeline_step_name
                or existing_service_config.pipeline_step_name
                == config.pipeline_step_name
            )
            and (
                not config.pipeline_run_id
                or existing_service_config.pipeline_run_id
                == config.pipeline_run_id
            )
        ):
            return True

        return False

    def stop_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Method to stop a model server.

        Args:
            uuid: UUID of the model server to stop.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.
        """
        # get list of all services
        existing_services = self.find_model_server(service_uuid=uuid)

        # if the service exists, stop it
        if existing_services:
            existing_services[0].stop(timeout=timeout, force=force)

    def start_model_server(
        self, uuid: UUID, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
    ) -> None:
        """Method to start a model server.

        Args:
            uuid: UUID of the model server to start.
            timeout: Timeout in seconds to wait for the service to start.
        """
        # get list of all services
        existing_services = self.find_model_server(service_uuid=uuid)

        # if the service exists, start it
        if existing_services:
            existing_services[0].start(timeout=timeout)

    def delete_model_server(
        self,
        uuid: UUID,
        timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
        force: bool = False,
    ) -> None:
        """Method to delete all configuration of a model server.

        Args:
            uuid: UUID of the model server to delete.
            timeout: Timeout in seconds to wait for the service to stop.
            force: If True, force the service to stop.
        """
        # get list of all services
        existing_services = self.find_model_server(service_uuid=uuid)

        # if the service exists, clean it up
        if existing_services:
            service = cast(BentoMLDeploymentService, existing_services[0])
            self._clean_up_existing_service(
                existing_service=service, timeout=timeout, force=force
            )
config: BentoMLModelDeployerConfig property readonly

Returns the BentoMLModelDeployerConfig config.

Returns:

Type Description
BentoMLModelDeployerConfig

The configuration.

local_path: str property readonly

Returns the path to the root directory.

This is where all configurations for BentoML deployment daemon processes are stored.

If the service path is not set in the config by the user, the path is set to a local default path according to the component ID.

Returns:

Type Description
str

The path to the local service root directory.

FLAVOR (BaseModelDeployerFlavor)

Flavor for the BentoML model deployer.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
class BentoMLModelDeployerFlavor(BaseModelDeployerFlavor):
    """Flavor for the BentoML model deployer."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            Name of the flavor.
        """
        return BENTOML_MODEL_DEPLOYER_FLAVOR

    @property
    def config_class(self) -> Type[BentoMLModelDeployerConfig]:
        """Returns `BentoMLModelDeployerConfig` config class.

        Returns:
                The config class.
        """
        return BentoMLModelDeployerConfig

    @property
    def implementation_class(self) -> Type["BentoMLModelDeployer"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.bentoml.model_deployers import (
            BentoMLModelDeployer,
        )

        return BentoMLModelDeployer
config_class: Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig] property readonly

Returns BentoMLModelDeployerConfig config class.

Returns:

Type Description
Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig]

The config class.

implementation_class: Type[BentoMLModelDeployer] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[BentoMLModelDeployer]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

Name of the flavor.

delete_model_server(self, uuid, timeout=10, force=False)

Method to delete all configuration of a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to delete.

required
timeout int

Timeout in seconds to wait for the service to stop.

10
force bool

If True, force the service to stop.

False
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def delete_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to delete all configuration of a model server.

    Args:
        uuid: UUID of the model server to delete.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    # get list of all services
    existing_services = self.find_model_server(service_uuid=uuid)

    # if the service exists, clean it up
    if existing_services:
        service = cast(BentoMLDeploymentService, existing_services[0])
        self._clean_up_existing_service(
            existing_service=service, timeout=timeout, force=force
        )
deploy_model(self, config, replace=False, timeout=10)

Create a new BentoML deployment service or update an existing one.

This should serve the supplied model and deployment configuration.

This method has two modes of operation, depending on the replace argument value:

  • if replace is False, calling this method will create a new BentoML deployment server to reflect the model and other configuration parameters specified in the supplied BentoML service config.

  • if replace is True, this method will first attempt to find an existing BentoML deployment service that is equivalent to the supplied configuration parameters. Two or more BentoML deployment services are considered equivalent if they have the same pipeline_name, pipeline_step_name and model_name configuration parameters. To put it differently, two BentoML deployment services are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent BentoML deployment is found, it will be updated in place to reflect the new configuration parameters.

Callers should set replace to True if they want a continuous model deployment workflow that doesn't spin up a new BentoML deployment server for each new model version. If multiple equivalent BentoML deployment servers are found, one is selected at random to be updated and the others are deleted.

Parameters:

Name Type Description Default
config ServiceConfig

the configuration of the model to be deployed with BentoML.

required
replace bool

set this flag to True to find and update an equivalent BentoML deployment server with the new model instead of creating and starting a new deployment server.

False
timeout int

the timeout in seconds to wait for the BentoML server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the BentoML server is provisioned, without waiting for it to fully start.

10

Returns:

Type Description
BaseService

The ZenML BentoML deployment service object that can be used to interact with the BentoML model http server.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def deploy_model(
    self,
    config: ServiceConfig,
    replace: bool = False,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
    """Create a new BentoML deployment service or update an existing one.

    This should serve the supplied model and deployment configuration.

    This method has two modes of operation, depending on the `replace`
    argument value:

      * if `replace` is False, calling this method will create a new BentoML
        deployment server to reflect the model and other configuration
        parameters specified in the supplied BentoML service `config`.

      * if `replace` is True, this method will first attempt to find an
        existing BentoML deployment service that is *equivalent* to the
        supplied configuration parameters. Two or more BentoML deployment
        services are considered equivalent if they have the same
        `pipeline_name`, `pipeline_step_name` and `model_name` configuration
        parameters. To put it differently, two BentoML deployment services
        are equivalent if they serve versions of the same model deployed by
        the same pipeline step. If an equivalent BentoML deployment is found,
        it will be updated in place to reflect the new configuration
        parameters.

    Callers should set `replace` to True if they want a continuous model
    deployment workflow that doesn't spin up a new BentoML deployment
    server for each new model version. If multiple equivalent BentoML
    deployment servers are found, one is selected at random to be updated
    and the others are deleted.

    Args:
        config: the configuration of the model to be deployed with BentoML.
        replace: set this flag to True to find and update an equivalent
            BentoML deployment server with the new model instead of
            creating and starting a new deployment server.
        timeout: the timeout in seconds to wait for the BentoML server
            to be provisioned and successfully started or updated. If set
            to 0, the method will return immediately after the BentoML
            server is provisioned, without waiting for it to fully start.

    Returns:
        The ZenML BentoML deployment service object that can be used to
        interact with the BentoML model http server.
    """
    config = cast(BentoMLDeploymentConfig, config)
    service = None

    # if replace is True, remove all existing services
    if replace is True:
        existing_services = self.find_model_server(
            pipeline_name=config.pipeline_name,
            pipeline_step_name=config.pipeline_step_name,
            model_name=config.model_name,
        )

        for existing_service in existing_services:
            if service is None:
                # keep the most recently created service
                service = cast(BentoMLDeploymentService, existing_service)
            try:
                # delete the older services and don't wait for them to
                # be deprovisioned
                self._clean_up_existing_service(
                    existing_service=cast(
                        BentoMLDeploymentService, existing_service
                    ),
                    timeout=timeout,
                    force=True,
                )
            except RuntimeError:
                # ignore errors encountered while stopping old services
                pass
    if service:
        logger.info(
            f"Updating an existing BentoML deployment service: {service}"
        )

        # set the root runtime path with the stack component's UUID
        config.root_runtime_path = self.local_path
        service.stop(timeout=timeout, force=True)
        service.update(config)
        service.start(timeout=timeout)
    else:
        # create a new BentoMLDeploymentService instance
        service = self._create_new_service(timeout, config)
        logger.info(f"Created a new BentoML deployment service: {service}")

    return cast(BaseService, service)
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, pipeline_run_id=None, pipeline_step_name=None, model_name=None, model_uri=None, model_type=None)

Finds one or more model servers that match the given criteria.

Parameters:

Name Type Description Default
running bool

If true, only running services will be returned.

False
service_uuid Optional[uuid.UUID]

The UUID of the service that was originally used to deploy the model.

None
pipeline_name Optional[str]

Name of the pipeline that the deployed model was part of.

None
pipeline_run_id Optional[str]

ID of the pipeline run which the deployed model was part of.

None
pipeline_step_name Optional[str]

The name of the pipeline model deployment step that deployed the model.

None
model_name Optional[str]

Name of the deployed model.

None
model_uri Optional[str]

URI of the deployed model.

None
model_type Optional[str]

Type/format of the deployed model. Not used in this BentoML case.

None

Returns:

Type Description
List[zenml.services.service.BaseService]

One or more Service objects representing model servers that match the input search criteria.

Exceptions:

Type Description
TypeError

if any of the input arguments are of an invalid type.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def find_model_server(
    self,
    running: bool = False,
    service_uuid: Optional[UUID] = None,
    pipeline_name: Optional[str] = None,
    pipeline_run_id: Optional[str] = None,
    pipeline_step_name: Optional[str] = None,
    model_name: Optional[str] = None,
    model_uri: Optional[str] = None,
    model_type: Optional[str] = None,
) -> List[BaseService]:
    """Finds one or more model servers that match the given criteria.

    Args:
        running: If true, only running services will be returned.
        service_uuid: The UUID of the service that was originally used
            to deploy the model.
        pipeline_name: Name of the pipeline that the deployed model was part
            of.
        pipeline_run_id: ID of the pipeline run which the deployed model
            was part of.
        pipeline_step_name: The name of the pipeline model deployment step
            that deployed the model.
        model_name: Name of the deployed model.
        model_uri: URI of the deployed model.
        model_type: Type/format of the deployed model. Not used in this
            BentoML case.

    Returns:
        One or more Service objects representing model servers that match
        the input search criteria.

    Raises:
        TypeError: if any of the input arguments are of an invalid type.
    """
    services = []
    config = BentoMLDeploymentConfig(
        model_name=model_name or "",
        bento="",
        port=BENTOML_DEFAULT_PORT,
        model_uri=model_uri or "",
        working_dir="",
        pipeline_name=pipeline_name or "",
        pipeline_run_id=pipeline_run_id or "",
        pipeline_step_name=pipeline_step_name or "",
    )

    # find all services that match the input criteria
    for root, _, files in os.walk(self.local_path):
        if service_uuid and Path(root).name != str(service_uuid):
            continue
        for file in files:
            if file == SERVICE_DAEMON_CONFIG_FILE_NAME:
                service_config_path = os.path.join(root, file)
                logger.debug(
                    "Loading service daemon configuration from %s",
                    service_config_path,
                )
                existing_service_config = None
                with open(service_config_path, "r") as f:
                    existing_service_config = f.read()
                existing_service = ServiceRegistry().load_service_from_json(
                    existing_service_config
                )
                if not isinstance(
                    existing_service, BentoMLDeploymentService
                ):
                    raise TypeError(
                        f"Expected service type BentoMLDeploymentService but got "
                        f"{type(existing_service)} instead"
                    )
                existing_service.update_status()
                if self._matches_search_criteria(existing_service, config):
                    if not running or existing_service.is_running:
                        services.append(cast(BaseService, existing_service))

    return services
get_model_server_info(service_instance) staticmethod

Return implementation specific information on the model server.

Parameters:

Name Type Description Default
service_instance BentoMLDeploymentService

BentoML deployment service object

required

Returns:

Type Description
Dict[str, Optional[str]]

A dictionary containing the model server information.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
@staticmethod
def get_model_server_info(  # type: ignore[override]
    service_instance: "BentoMLDeploymentService",
) -> Dict[str, Optional[str]]:
    """Return implementation specific information on the model server.

    Args:
        service_instance: BentoML deployment service object

    Returns:
        A dictionary containing the model server information.
    """
    predictions_apis_urls = ""
    if service_instance.prediction_apis_urls is not None:
        predictions_apis_urls = ", ".join(
            [
                api
                for api in service_instance.prediction_apis_urls
                if api is not None
            ]
        )

    return {
        "PREDICTION_URL": service_instance.prediction_url,
        "BENTO_TAG": service_instance.config.bento,
        "MODEL_NAME": service_instance.config.model_name,
        "MODEL_URI": service_instance.config.model_uri,
        "BENTO_URI": service_instance.config.bento_uri,
        "SERVICE_PATH": service_instance.status.runtime_path,
        "DAEMON_PID": str(service_instance.status.pid),
        "PREDICITON_APIS_URLS": predictions_apis_urls,
    }
get_service_path(id_) staticmethod

Get the path where local BentoML service information is stored.

This includes the deployment service configuration, PID and log files are stored.

Parameters:

Name Type Description Default
id_ UUID

The ID of the BentoML model deployer.

required

Returns:

Type Description
str

The service path.

Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
@staticmethod
def get_service_path(id_: UUID) -> str:
    """Get the path where local BentoML service information is stored.

    This includes the deployment service configuration, PID and log files
    are stored.

    Args:
        id_: The ID of the BentoML model deployer.

    Returns:
        The service path.
    """
    service_path = os.path.join(
        GlobalConfiguration().local_stores_path,
        str(id_),
    )
    create_dir_recursive_if_not_exists(service_path)
    return service_path
start_model_server(self, uuid, timeout=10)

Method to start a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to start.

required
timeout int

Timeout in seconds to wait for the service to start.

10
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def start_model_server(
    self, uuid: UUID, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
) -> None:
    """Method to start a model server.

    Args:
        uuid: UUID of the model server to start.
        timeout: Timeout in seconds to wait for the service to start.
    """
    # get list of all services
    existing_services = self.find_model_server(service_uuid=uuid)

    # if the service exists, start it
    if existing_services:
        existing_services[0].start(timeout=timeout)
stop_model_server(self, uuid, timeout=10, force=False)

Method to stop a model server.

Parameters:

Name Type Description Default
uuid UUID

UUID of the model server to stop.

required
timeout int

Timeout in seconds to wait for the service to stop.

10
force bool

If True, force the service to stop.

False
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def stop_model_server(
    self,
    uuid: UUID,
    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
    force: bool = False,
) -> None:
    """Method to stop a model server.

    Args:
        uuid: UUID of the model server to stop.
        timeout: Timeout in seconds to wait for the service to stop.
        force: If True, force the service to stop.
    """
    # get list of all services
    existing_services = self.find_model_server(service_uuid=uuid)

    # if the service exists, stop it
    if existing_services:
        existing_services[0].stop(timeout=timeout, force=force)

services special

Initialization for BentoML services.

bentoml_deployment

Implementation for the BentoML inference service.

BentoMLDeploymentConfig (LocalDaemonServiceConfig) pydantic-model

BentoML model deployment configuration.

Attributes:

Name Type Description
model_name str

name of the model to deploy

model_uri str

URI of the model to deploy

port Optional[int]

port to expose the service on

bento str

Bento package to deploy

workers Optional[int]

number of workers to use

backlog Optional[int]

number of requests to queue

production bool

whether to run in production mode

working_dir str

working directory for the service

host Optional[str]

host to expose the service on

ssl_parameters Optional[zenml.integrations.bentoml.services.bentoml_deployment.SSLBentoMLParametersConfig]

SSL parameters for the Bentoml deployment

Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class BentoMLDeploymentConfig(LocalDaemonServiceConfig):
    """BentoML model deployment configuration.

    Attributes:
        model_name: name of the model to deploy
        model_uri: URI of the model to deploy
        port: port to expose the service on
        bento: Bento package to deploy
        workers: number of workers to use
        backlog: number of requests to queue
        production: whether to run in production mode
        working_dir: working directory for the service
        host: host to expose the service on
        ssl_parameters: SSL parameters for the Bentoml deployment
    """

    model_name: str
    model_uri: str
    bento: str
    bento_uri: Optional[str] = None
    apis: List[str] = []
    workers: Optional[int] = None
    port: Optional[int] = None
    backlog: Optional[int] = None
    production: bool = False
    working_dir: str
    host: Optional[str] = None
    ssl_parameters: Optional[SSLBentoMLParametersConfig] = Field(
        default_factory=SSLBentoMLParametersConfig
    )
BentoMLDeploymentEndpoint (LocalDaemonServiceEndpoint) pydantic-model

A service endpoint exposed by the BentoML deployment daemon.

Attributes:

Name Type Description
config BentoMLDeploymentEndpointConfig

service endpoint configuration

Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class BentoMLDeploymentEndpoint(LocalDaemonServiceEndpoint):
    """A service endpoint exposed by the BentoML deployment daemon.

    Attributes:
        config: service endpoint configuration
    """

    config: BentoMLDeploymentEndpointConfig
    monitor: HTTPEndpointHealthMonitor

    @property
    def prediction_url(self) -> Optional[str]:
        """Gets the prediction URL for the endpoint.

        Returns:
            the prediction URL for the endpoint
        """
        uri = self.status.uri
        if not uri:
            return None
        return os.path.join(uri, self.config.prediction_url_path)
prediction_url: Optional[str] property readonly

Gets the prediction URL for the endpoint.

Returns:

Type Description
Optional[str]

the prediction URL for the endpoint

BentoMLDeploymentEndpointConfig (LocalDaemonServiceEndpointConfig) pydantic-model

BentoML deployment service configuration.

Attributes:

Name Type Description
prediction_url_path str

URI subpath for prediction requests

Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class BentoMLDeploymentEndpointConfig(LocalDaemonServiceEndpointConfig):
    """BentoML deployment service configuration.

    Attributes:
        prediction_url_path: URI subpath for prediction requests
    """

    prediction_url_path: str
BentoMLDeploymentService (LocalDaemonService) pydantic-model

BentoML deployment service used to start a local prediction server for BentoML models.

Attributes:

Name Type Description
SERVICE_TYPE ClassVar[zenml.services.service_type.ServiceType]

a service type descriptor with information describing the BentoML deployment service class

config BentoMLDeploymentConfig

service configuration

endpoint BentoMLDeploymentEndpoint

optional service endpoint

Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class BentoMLDeploymentService(LocalDaemonService):
    """BentoML deployment service used to start a local prediction server for BentoML models.

    Attributes:
        SERVICE_TYPE: a service type descriptor with information describing
            the BentoML deployment service class
        config: service configuration
        endpoint: optional service endpoint
    """

    SERVICE_TYPE = ServiceType(
        name="bentoml-deployment",
        type="model-serving",
        flavor="bentoml",
        description="BentoML prediction service",
    )

    config: BentoMLDeploymentConfig
    endpoint: BentoMLDeploymentEndpoint

    def __init__(
        self,
        config: Union[BentoMLDeploymentConfig, Dict[str, Any]],
        **attrs: Any,
    ) -> None:
        """Initialize the BentoML deployment service.

        Args:
            config: service configuration
            attrs: additional attributes to set on the service
        """
        # ensure that the endpoint is created before the service is initialized
        # TODO [ENG-700]: implement a service factory or builder for BentoML
        #   deployment services
        if (
            isinstance(config, BentoMLDeploymentConfig)
            and "endpoint" not in attrs
        ):

            endpoint = BentoMLDeploymentEndpoint(
                config=BentoMLDeploymentEndpointConfig(
                    protocol=ServiceEndpointProtocol.HTTP,
                    port=config.port,
                    ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                    prediction_url_path=BENTOML_PREDICTION_URL_PATH,
                ),
                monitor=HTTPEndpointHealthMonitor(
                    config=HTTPEndpointHealthMonitorConfig(
                        healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                    )
                ),
            )
            attrs["endpoint"] = endpoint
        super().__init__(config=config, **attrs)

    def run(self) -> None:
        """Start the service."""
        logger.info(
            "Starting BentoML prediction service as blocking "
            "process... press CTRL+C once to stop it."
        )

        self.endpoint.prepare_for_start()
        ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
        # verify if to deploy in production mode or development mode
        if self.config.production:
            logger.info("Running in production mode.")
            from bentoml.serve import serve_http_production

            try:
                serve_http_production(
                    self.config.bento,
                    port=self.endpoint.status.port,
                    api_workers=self.config.workers,
                    backlog=self.config.backlog,
                    host=self.endpoint.status.hostname,
                    working_dir=self.config.working_dir,
                    ssl_certfile=ssl_params.ssl_certfile,
                    ssl_keyfile=ssl_params.ssl_keyfile,
                    ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                    ssl_version=ssl_params.ssl_version,
                    ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                    ssl_ca_certs=ssl_params.ssl_ca_certs,
                    ssl_ciphers=ssl_params.ssl_ciphers,
                )
            except KeyboardInterrupt:
                logger.info(
                    "BentoML prediction service stopped. Resuming normal execution."
                )
        else:
            logger.info("Running in development mode.")
            from bentoml.serve import serve_http_development

            try:
                serve_http_development(
                    self.config.bento,
                    port=self.endpoint.status.port,
                    working_dir=self.config.working_dir,
                    host=self.endpoint.status.hostname,
                    ssl_certfile=ssl_params.ssl_certfile,
                    ssl_keyfile=ssl_params.ssl_keyfile,
                    ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                    ssl_version=ssl_params.ssl_version,
                    ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                    ssl_ca_certs=ssl_params.ssl_ca_certs,
                    ssl_ciphers=ssl_params.ssl_ciphers,
                )
            except KeyboardInterrupt:
                logger.info(
                    "BentoML prediction service stopped. Resuming normal execution."
                )

    @property
    def prediction_url(self) -> Optional[str]:
        """Get the URI where the http server is running.

        Returns:
            The URI where the http service can be accessed to get more information
            about the service and to make predictions.
        """
        if not self.is_running:
            return None
        return self.endpoint.prediction_url

    @property
    def prediction_apis_urls(self) -> Optional[List[str]]:
        """Get the URI where the prediction api services is answering requests.

        Returns:
            The URI where the prediction service apis can be contacted to process
            HTTP/REST inference requests, or None, if the service isn't running.
        """
        if not self.is_running:
            return None

        if self.config.apis:
            return [
                f"{self.endpoint.prediction_url}/{api}"
                for api in self.config.apis
            ]
        return None

    def predict(self, api_endpoint: str, data: "Any") -> "Any":
        """Make a prediction using the service.

        Args:
            data: data to make a prediction on
            api_endpoint: the api endpoint to make the prediction on

        Returns:
            The prediction result.

        Raises:
            Exception: if the service is not running
            ValueError: if the prediction endpoint is unknown.
        """
        if not self.is_running:
            raise Exception(
                "BentoML prediction service is not running. "
                "Please start the service before making predictions."
            )
        if self.endpoint.prediction_url is not None:
            client = Client.from_url(self.endpoint.prediction_url)
            result = client.call(api_endpoint, data)
        else:
            raise ValueError("No endpoint known for prediction.")
        return result
prediction_apis_urls: Optional[List[str]] property readonly

Get the URI where the prediction api services is answering requests.

Returns:

Type Description
Optional[List[str]]

The URI where the prediction service apis can be contacted to process HTTP/REST inference requests, or None, if the service isn't running.

prediction_url: Optional[str] property readonly

Get the URI where the http server is running.

Returns:

Type Description
Optional[str]

The URI where the http service can be accessed to get more information about the service and to make predictions.

__init__(self, config, **attrs) special

Initialize the BentoML deployment service.

Parameters:

Name Type Description Default
config Union[zenml.integrations.bentoml.services.bentoml_deployment.BentoMLDeploymentConfig, Dict[str, Any]]

service configuration

required
attrs Any

additional attributes to set on the service

{}
Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
def __init__(
    self,
    config: Union[BentoMLDeploymentConfig, Dict[str, Any]],
    **attrs: Any,
) -> None:
    """Initialize the BentoML deployment service.

    Args:
        config: service configuration
        attrs: additional attributes to set on the service
    """
    # ensure that the endpoint is created before the service is initialized
    # TODO [ENG-700]: implement a service factory or builder for BentoML
    #   deployment services
    if (
        isinstance(config, BentoMLDeploymentConfig)
        and "endpoint" not in attrs
    ):

        endpoint = BentoMLDeploymentEndpoint(
            config=BentoMLDeploymentEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                port=config.port,
                ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
                prediction_url_path=BENTOML_PREDICTION_URL_PATH,
            ),
            monitor=HTTPEndpointHealthMonitor(
                config=HTTPEndpointHealthMonitorConfig(
                    healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
                )
            ),
        )
        attrs["endpoint"] = endpoint
    super().__init__(config=config, **attrs)
predict(self, api_endpoint, data)

Make a prediction using the service.

Parameters:

Name Type Description Default
data Any

data to make a prediction on

required
api_endpoint str

the api endpoint to make the prediction on

required

Returns:

Type Description
Any

The prediction result.

Exceptions:

Type Description
Exception

if the service is not running

ValueError

if the prediction endpoint is unknown.

Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
def predict(self, api_endpoint: str, data: "Any") -> "Any":
    """Make a prediction using the service.

    Args:
        data: data to make a prediction on
        api_endpoint: the api endpoint to make the prediction on

    Returns:
        The prediction result.

    Raises:
        Exception: if the service is not running
        ValueError: if the prediction endpoint is unknown.
    """
    if not self.is_running:
        raise Exception(
            "BentoML prediction service is not running. "
            "Please start the service before making predictions."
        )
    if self.endpoint.prediction_url is not None:
        client = Client.from_url(self.endpoint.prediction_url)
        result = client.call(api_endpoint, data)
    else:
        raise ValueError("No endpoint known for prediction.")
    return result
run(self)

Start the service.

Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
def run(self) -> None:
    """Start the service."""
    logger.info(
        "Starting BentoML prediction service as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()
    ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
    # verify if to deploy in production mode or development mode
    if self.config.production:
        logger.info("Running in production mode.")
        from bentoml.serve import serve_http_production

        try:
            serve_http_production(
                self.config.bento,
                port=self.endpoint.status.port,
                api_workers=self.config.workers,
                backlog=self.config.backlog,
                host=self.endpoint.status.hostname,
                working_dir=self.config.working_dir,
                ssl_certfile=ssl_params.ssl_certfile,
                ssl_keyfile=ssl_params.ssl_keyfile,
                ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                ssl_version=ssl_params.ssl_version,
                ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                ssl_ca_certs=ssl_params.ssl_ca_certs,
                ssl_ciphers=ssl_params.ssl_ciphers,
            )
        except KeyboardInterrupt:
            logger.info(
                "BentoML prediction service stopped. Resuming normal execution."
            )
    else:
        logger.info("Running in development mode.")
        from bentoml.serve import serve_http_development

        try:
            serve_http_development(
                self.config.bento,
                port=self.endpoint.status.port,
                working_dir=self.config.working_dir,
                host=self.endpoint.status.hostname,
                ssl_certfile=ssl_params.ssl_certfile,
                ssl_keyfile=ssl_params.ssl_keyfile,
                ssl_keyfile_password=ssl_params.ssl_keyfile_password,
                ssl_version=ssl_params.ssl_version,
                ssl_cert_reqs=ssl_params.ssl_cert_reqs,
                ssl_ca_certs=ssl_params.ssl_ca_certs,
                ssl_ciphers=ssl_params.ssl_ciphers,
            )
        except KeyboardInterrupt:
            logger.info(
                "BentoML prediction service stopped. Resuming normal execution."
            )
SSLBentoMLParametersConfig (BaseModel) pydantic-model

BentoML SSL parameters configuration.

Attributes:

Name Type Description
ssl_certfile Optional[str]

SSL certificate file

ssl_keyfile Optional[str]

SSL key file

ssl_keyfile_password Optional[str]

SSL key file password

ssl_version Optional[str]

SSL version

ssl_cert_reqs Optional[str]

SSL certificate requirements

ssl_ca_certs Optional[str]

SSL CA certificates

ssl_ciphers Optional[str]

SSL ciphers

Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class SSLBentoMLParametersConfig(BaseModel):
    """BentoML SSL parameters configuration.

    Attributes:
        ssl_certfile: SSL certificate file
        ssl_keyfile: SSL key file
        ssl_keyfile_password: SSL key file password
        ssl_version: SSL version
        ssl_cert_reqs: SSL certificate requirements
        ssl_ca_certs: SSL CA certificates
        ssl_ciphers: SSL ciphers
    """

    ssl_certfile: Optional[str] = None
    ssl_keyfile: Optional[str] = None
    ssl_keyfile_password: Optional[str] = None
    ssl_version: Optional[str] = None
    ssl_cert_reqs: Optional[str] = None
    ssl_ca_certs: Optional[str] = None
    ssl_ciphers: Optional[str] = None

steps special

Initialization of the BentoML standard interface steps.

bento_builder

Implementation of the BentoML bento builder step.

BentoMLBuilderParameters (BaseParameters) pydantic-model

BentoML Bento builder step parameters.

Attributes:

Name Type Description
service str

the name of the BentoML service to be deployed.

model_name str

the name of the model to be packaged.

model_type str

the type of the model.

version Optional[str]

the version of the model if given.

labels Optional[Dict[str, str]]

the labels of the model if given.

description Optional[str]

the description of the model if given.

include Optional[List[str]]

the files to be included in the BentoML bundle.

exclude Optional[List[str]]

the files to be excluded from the BentoML bundle.

python Optional[Dict[str, str]]

dictionary for configuring Bento's python dependencies,

docker Optional[Dict[str, str]]

dictionary for configuring Bento's docker image.

Source code in zenml/integrations/bentoml/steps/bento_builder.py
class BentoMLBuilderParameters(BaseParameters):
    """BentoML Bento builder step parameters.

    Attributes:
        service: the name of the BentoML service to be deployed.
        model_name: the name of the model to be packaged.
        model_type: the type of the model.
        version: the version of the model if given.
        labels: the labels of the model if given.
        description: the description of the model if given.
        include: the files to be included in the BentoML bundle.
        exclude: the files to be excluded from the BentoML bundle.
        python: dictionary for configuring Bento's python dependencies,
        docker: dictionary for configuring Bento's docker image.
    """

    service: str
    model_name: str
    model_type: str
    version: Optional[str] = None
    labels: Optional[Dict[str, str]] = None
    description: Optional[str] = None
    include: Optional[List[str]] = None
    exclude: Optional[List[str]] = None
    python: Optional[Dict[str, str]] = None
    docker: Optional[Dict[str, str]] = None
    working_dir: Optional[str] = None
bento_builder_step (BaseStep)

Build a BentoML Model and Bento bundle.

This steps takes a model artifact of a trained or loaded ML model in a previous step and save it with BentoML, then build a BentoML bundle.

Parameters:

Name Type Description Default
model

the model to be packaged.

required
params

the parameters for the BentoML builder step.

required
context

the step context.

required

Returns:

Type Description

the BentoML Bento object.

PARAMETERS_CLASS (BaseParameters) pydantic-model

BentoML Bento builder step parameters.

Attributes:

Name Type Description
service str

the name of the BentoML service to be deployed.

model_name str

the name of the model to be packaged.

model_type str

the type of the model.

version Optional[str]

the version of the model if given.

labels Optional[Dict[str, str]]

the labels of the model if given.

description Optional[str]

the description of the model if given.

include Optional[List[str]]

the files to be included in the BentoML bundle.

exclude Optional[List[str]]

the files to be excluded from the BentoML bundle.

python Optional[Dict[str, str]]

dictionary for configuring Bento's python dependencies,

docker Optional[Dict[str, str]]

dictionary for configuring Bento's docker image.

Source code in zenml/integrations/bentoml/steps/bento_builder.py
class BentoMLBuilderParameters(BaseParameters):
    """BentoML Bento builder step parameters.

    Attributes:
        service: the name of the BentoML service to be deployed.
        model_name: the name of the model to be packaged.
        model_type: the type of the model.
        version: the version of the model if given.
        labels: the labels of the model if given.
        description: the description of the model if given.
        include: the files to be included in the BentoML bundle.
        exclude: the files to be excluded from the BentoML bundle.
        python: dictionary for configuring Bento's python dependencies,
        docker: dictionary for configuring Bento's docker image.
    """

    service: str
    model_name: str
    model_type: str
    version: Optional[str] = None
    labels: Optional[Dict[str, str]] = None
    description: Optional[str] = None
    include: Optional[List[str]] = None
    exclude: Optional[List[str]] = None
    python: Optional[Dict[str, str]] = None
    docker: Optional[Dict[str, str]] = None
    working_dir: Optional[str] = None
entrypoint(model, params, context) staticmethod

Build a BentoML Model and Bento bundle.

This steps takes a model artifact of a trained or loaded ML model in a previous step and save it with BentoML, then build a BentoML bundle.

Parameters:

Name Type Description Default
model ModelArtifact

the model to be packaged.

required
params BentoMLBuilderParameters

the parameters for the BentoML builder step.

required
context StepContext

the step context.

required

Returns:

Type Description
Bento

the BentoML Bento object.

Source code in zenml/integrations/bentoml/steps/bento_builder.py
@step
def bento_builder_step(
    model: ModelArtifact,
    params: BentoMLBuilderParameters,
    context: StepContext,
) -> bento.Bento:
    """Build a BentoML Model and Bento bundle.

    This steps takes a model artifact of a trained or loaded ML model in a
    previous step and save it with BentoML, then build a BentoML bundle.

    Args:
        model: the model to be packaged.
        params: the parameters for the BentoML builder step.
        context: the step context.

    Returns:
        the BentoML Bento object.
    """
    # save the model and bento uri as part of the bento lables
    labels = params.labels or {}
    labels["model_uri"] = model.uri
    labels["bento_uri"] = os.path.join(
        context.get_output_artifact_uri(), DEFAULT_BENTO_FILENAME
    )

    # Load the model from the model artifact
    model = model_from_model_artifact(model)

    # Save the model to a BentoML model based on the model type
    try:
        module = importlib.import_module(f".{params.model_type}", "bentoml")
        module.save_model(params.model_name, model, labels=params.labels)
    except importlib.metadata.PackageNotFoundError:
        bentoml.picklable_model.save_model(
            params.model_name,
            model,
        )

    # Build the BentoML bundle
    bento = bentos.build(
        service=params.service,
        version=params.version,
        labels=labels,
        description=params.description,
        include=params.include,
        exclude=params.exclude,
        python=params.python,
        docker=params.docker,
        build_ctx=params.working_dir or source_utils.get_source_root_path(),
    )

    # Return the BentoML Bento bundle
    return bento

bentoml_deployer

Implementation of the BentoML model deployer pipeline step.

BentoMLDeployerParameters (BaseParameters) pydantic-model

Model deployer step parameters for BentoML.

Attributes:

Name Type Description
model_name str

the name of the model to deploy.

port int

the port to use for the prediction service.

workers Optional[int]

number of workers to use for the prediction service

backlog Optional[int]

the number of requests to queue up before rejecting requests.

production bool

whether to deploy the service in production mode.

working_dir Optional[str]

the working directory to use for the prediction service.

host Optional[str]

the host to use for the prediction service.

timeout int

the number of seconds to wait for the service to start/stop.

Source code in zenml/integrations/bentoml/steps/bentoml_deployer.py
class BentoMLDeployerParameters(BaseParameters):
    """Model deployer step parameters for BentoML.

    Attributes:
        model_name: the name of the model to deploy.
        port: the port to use for the prediction service.
        workers: number of workers to use for the prediction service
        backlog: the number of requests to queue up before rejecting requests.
        production: whether to deploy the service in production mode.
        working_dir: the working directory to use for the prediction service.
        host: the host to use for the prediction service.
        timeout: the number of seconds to wait for the service to start/stop.
    """

    model_name: str
    port: int
    workers: Optional[int] = None
    backlog: Optional[int] = None
    production: bool = False
    working_dir: Optional[str] = None
    host: Optional[str] = None
    ssl_certfile: Optional[str] = None
    ssl_keyfile: Optional[str] = None
    ssl_keyfile_password: Optional[str] = None
    ssl_version: Optional[str] = None
    ssl_cert_reqs: Optional[str] = None
    ssl_ca_certs: Optional[str] = None
    ssl_ciphers: Optional[str] = None
    timeout: int = 30
bentoml_model_deployer_step (BaseStep)

Model deployer pipeline step for BentoML.

This step deploys a given Bento to a local BentoML http prediction server.

Parameters:

Name Type Description Default
deploy_decision

whether to deploy the model or not

required
params

parameters for the deployer step

required
bento

the bento artifact to deploy

required

Returns:

Type Description

BentoML deployment service

PARAMETERS_CLASS (BaseParameters) pydantic-model

Model deployer step parameters for BentoML.

Attributes:

Name Type Description
model_name str

the name of the model to deploy.

port int

the port to use for the prediction service.

workers Optional[int]

number of workers to use for the prediction service

backlog Optional[int]

the number of requests to queue up before rejecting requests.

production bool

whether to deploy the service in production mode.

working_dir Optional[str]

the working directory to use for the prediction service.

host Optional[str]

the host to use for the prediction service.

timeout int

the number of seconds to wait for the service to start/stop.

Source code in zenml/integrations/bentoml/steps/bentoml_deployer.py
class BentoMLDeployerParameters(BaseParameters):
    """Model deployer step parameters for BentoML.

    Attributes:
        model_name: the name of the model to deploy.
        port: the port to use for the prediction service.
        workers: number of workers to use for the prediction service
        backlog: the number of requests to queue up before rejecting requests.
        production: whether to deploy the service in production mode.
        working_dir: the working directory to use for the prediction service.
        host: the host to use for the prediction service.
        timeout: the number of seconds to wait for the service to start/stop.
    """

    model_name: str
    port: int
    workers: Optional[int] = None
    backlog: Optional[int] = None
    production: bool = False
    working_dir: Optional[str] = None
    host: Optional[str] = None
    ssl_certfile: Optional[str] = None
    ssl_keyfile: Optional[str] = None
    ssl_keyfile_password: Optional[str] = None
    ssl_version: Optional[str] = None
    ssl_cert_reqs: Optional[str] = None
    ssl_ca_certs: Optional[str] = None
    ssl_ciphers: Optional[str] = None
    timeout: int = 30
entrypoint(deploy_decision, bento, params) staticmethod

Model deployer pipeline step for BentoML.

This step deploys a given Bento to a local BentoML http prediction server.

Parameters:

Name Type Description Default
deploy_decision bool

whether to deploy the model or not

required
params BentoMLDeployerParameters

parameters for the deployer step

required
bento Bento

the bento artifact to deploy

required

Returns:

Type Description
BentoMLDeploymentService

BentoML deployment service

Source code in zenml/integrations/bentoml/steps/bentoml_deployer.py
@step(enable_cache=True)
def bentoml_model_deployer_step(
    deploy_decision: bool,
    bento: bento.Bento,
    params: BentoMLDeployerParameters,
) -> BentoMLDeploymentService:
    """Model deployer pipeline step for BentoML.

    This step deploys a given Bento to a local BentoML http prediction server.

    Args:
        deploy_decision: whether to deploy the model or not
        params: parameters for the deployer step
        bento: the bento artifact to deploy

    Returns:
        BentoML deployment service
    """
    # get the current active model deployer
    model_deployer = cast(
        BentoMLModelDeployer, BentoMLModelDeployer.get_active_model_deployer()
    )

    # get pipeline name, step name and run id
    step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
    pipeline_name = step_env.pipeline_name
    run_id = step_env.pipeline_run_id
    step_name = step_env.step_name

    # fetch existing services with same pipeline name, step name and model name
    existing_services = model_deployer.find_model_server(
        pipeline_name=pipeline_name,
        pipeline_step_name=step_name,
        model_name=params.model_name,
    )

    # Return the apis endpoint of the defined service to use in the predict.
    # This is a workaround to get the endpoints of the service defined as functions
    # from the user code in the BentoML service.
    def service_apis(bento_tag: str) -> List[str]:
        # Add working dir in the bentoml load
        service = bentoml.load(
            bento_identifier=bento_tag,
            working_dir=params.working_dir
            or source_utils.get_source_root_path(),
        )
        apis = service.apis
        apis_paths = list(apis.keys())
        return apis_paths

    # create a config for the new model service
    predictor_cfg = BentoMLDeploymentConfig(
        model_name=params.model_name,
        bento=str(bento.tag),
        model_uri=bento.info.labels.get("model_uri"),
        bento_uri=bento.info.labels.get("bento_uri"),
        apis=service_apis(str(bento.tag)),
        workers=params.workers,
        working_dir=params.working_dir or source_utils.get_source_root_path(),
        port=params.port,
        pipeline_name=pipeline_name,
        pipeline_run_id=run_id,
        pipeline_step_name=step_name,
        ssl_parameters=SSLBentoMLParametersConfig(
            ssl_certfile=params.ssl_certfile,
            ssl_keyfile=params.ssl_keyfile,
            ssl_keyfile_password=params.ssl_keyfile_password,
            ssl_version=params.ssl_version,
            ssl_cert_reqs=params.ssl_cert_reqs,
            ssl_ca_certs=params.ssl_ca_certs,
            ssl_ciphers=params.ssl_ciphers,
        ),
    )

    # Creating a new service with inactive state and status by default
    service = BentoMLDeploymentService(predictor_cfg)
    if existing_services:
        service = cast(BentoMLDeploymentService, existing_services[0])

    if not deploy_decision and existing_services:
        logger.info(
            f"Skipping model deployment because the model quality does not "
            f"meet the criteria. Reusing last model server deployed by step "
            f"'{step_name}' and pipeline '{pipeline_name}' for model "
            f"'{params.model_name}'..."
        )
        if not service.is_running:
            service.start(timeout=params.timeout)
        return service

    # create a new model deployment and replace an old one if it exists
    new_service = cast(
        BentoMLDeploymentService,
        model_deployer.deploy_model(
            replace=True,
            config=predictor_cfg,
            timeout=params.timeout,
        ),
    )

    logger.info(
        f"BentoML deployment service started and reachable at:\n"
        f"    {new_service.prediction_url}\n"
    )

    return new_service