Bentoml
zenml.integrations.bentoml
special
Initialization of the BentoML integration for ZenML.
The BentoML integration allows you to use the BentoML model serving to implement continuous model deployment.
BentoMLIntegration (Integration)
Definition of BentoML integration for ZenML.
Source code in zenml/integrations/bentoml/__init__.py
class BentoMLIntegration(Integration):
"""Definition of BentoML integration for ZenML."""
NAME = BENTOML
REQUIREMENTS = [
"bentoml>=1.0.10",
]
@classmethod
def activate(cls) -> None:
"""Activate the BentoML integration."""
from zenml.integrations.bentoml import materializers # noqa
from zenml.integrations.bentoml import model_deployers # noqa
from zenml.integrations.bentoml import services # noqa
from zenml.integrations.bentoml import steps # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for KServe.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.bentoml.flavors import (
BentoMLModelDeployerFlavor,
)
return [BentoMLModelDeployerFlavor]
activate()
classmethod
Activate the BentoML integration.
Source code in zenml/integrations/bentoml/__init__.py
@classmethod
def activate(cls) -> None:
"""Activate the BentoML integration."""
from zenml.integrations.bentoml import materializers # noqa
from zenml.integrations.bentoml import model_deployers # noqa
from zenml.integrations.bentoml import services # noqa
from zenml.integrations.bentoml import steps # noqa
flavors()
classmethod
Declare the stack component flavors for KServe.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/bentoml/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for KServe.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.bentoml.flavors import (
BentoMLModelDeployerFlavor,
)
return [BentoMLModelDeployerFlavor]
constants
BentoML constants.
flavors
special
BentoML integration flavors.
bentoml_model_deployer_flavor
BentoML model deployer flavor.
BentoMLModelDeployerConfig (BaseModelDeployerConfig)
pydantic-model
Configuration for the BentoMLModelDeployer.
Source code in zenml/integrations/bentoml/flavors/bentoml_model_deployer_flavor.py
class BentoMLModelDeployerConfig(BaseModelDeployerConfig):
"""Configuration for the BentoMLModelDeployer."""
service_path: str = ""
BentoMLModelDeployerFlavor (BaseModelDeployerFlavor)
Flavor for the BentoML model deployer.
Source code in zenml/integrations/bentoml/flavors/bentoml_model_deployer_flavor.py
class BentoMLModelDeployerFlavor(BaseModelDeployerFlavor):
"""Flavor for the BentoML model deployer."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
Name of the flavor.
"""
return BENTOML_MODEL_DEPLOYER_FLAVOR
@property
def config_class(self) -> Type[BentoMLModelDeployerConfig]:
"""Returns `BentoMLModelDeployerConfig` config class.
Returns:
The config class.
"""
return BentoMLModelDeployerConfig
@property
def implementation_class(self) -> Type["BentoMLModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.bentoml.model_deployers import (
BentoMLModelDeployer,
)
return BentoMLModelDeployer
config_class: Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig]
property
readonly
Returns BentoMLModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig] |
The config class. |
implementation_class: Type[BentoMLModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[BentoMLModelDeployer] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
Name of the flavor. |
materializers
special
Initialization of the BentoML Bento Materializer.
bentoml_bento_materializer
Materializer for BentoML Bento objects.
BentoMaterializer (BaseMaterializer)
Materializer for Bentoml Bento objects.
Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
class BentoMaterializer(BaseMaterializer):
"""Materializer for Bentoml Bento objects."""
ASSOCIATED_TYPES = (bento.Bento,)
ASSOCIATED_ARTIFACT_TYPES = (DataArtifact,)
def handle_input(self, data_type: Type[bento.Bento]) -> bento.Bento:
"""Read from artifact store and return a Bento object.
Args:
data_type: An bento.Bento type.
Returns:
An bento.Bento object.
"""
super().handle_input(data_type)
# Create a temporary directory to store the model
temp_dir = tempfile.TemporaryDirectory()
# Copy from artifact store to temporary directory
io_utils.copy_dir(self.artifact.uri, temp_dir.name)
# Load the Bento from the temporary directory
imported_bento = Bento.import_from(
os.path.join(temp_dir.name, DEFAULT_BENTO_FILENAME)
)
# Try save the Bento to the local BentoML store
try:
_ = bentoml.get(imported_bento.tag)
except BentoMLException:
imported_bento.save()
return imported_bento
def handle_return(self, bento: bento.Bento) -> None:
"""Write to artifact store.
Args:
bento: An bento.Bento object.
"""
super().handle_return(bento)
# Create a temporary directory to store the model
temp_dir = tempfile.TemporaryDirectory(prefix="zenml-temp-")
temp_bento_path = os.path.join(temp_dir.name, DEFAULT_BENTO_FILENAME)
# save the image in a temporary directory
bentoml.export_bento(bento.tag, temp_bento_path)
# copy the saved image to the artifact store
io_utils.copy_dir(temp_dir.name, self.artifact.uri)
# Remove the temporary directory
fileio.rmtree(temp_dir.name)
handle_input(self, data_type)
Read from artifact store and return a Bento object.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data_type |
Type[bentoml._internal.bento.bento.Bento] |
An bento.Bento type. |
required |
Returns:
Type | Description |
---|---|
Bento |
An bento.Bento object. |
Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def handle_input(self, data_type: Type[bento.Bento]) -> bento.Bento:
"""Read from artifact store and return a Bento object.
Args:
data_type: An bento.Bento type.
Returns:
An bento.Bento object.
"""
super().handle_input(data_type)
# Create a temporary directory to store the model
temp_dir = tempfile.TemporaryDirectory()
# Copy from artifact store to temporary directory
io_utils.copy_dir(self.artifact.uri, temp_dir.name)
# Load the Bento from the temporary directory
imported_bento = Bento.import_from(
os.path.join(temp_dir.name, DEFAULT_BENTO_FILENAME)
)
# Try save the Bento to the local BentoML store
try:
_ = bentoml.get(imported_bento.tag)
except BentoMLException:
imported_bento.save()
return imported_bento
handle_return(self, bento)
Write to artifact store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
bento |
Bento |
An bento.Bento object. |
required |
Source code in zenml/integrations/bentoml/materializers/bentoml_bento_materializer.py
def handle_return(self, bento: bento.Bento) -> None:
"""Write to artifact store.
Args:
bento: An bento.Bento object.
"""
super().handle_return(bento)
# Create a temporary directory to store the model
temp_dir = tempfile.TemporaryDirectory(prefix="zenml-temp-")
temp_bento_path = os.path.join(temp_dir.name, DEFAULT_BENTO_FILENAME)
# save the image in a temporary directory
bentoml.export_bento(bento.tag, temp_bento_path)
# copy the saved image to the artifact store
io_utils.copy_dir(temp_dir.name, self.artifact.uri)
# Remove the temporary directory
fileio.rmtree(temp_dir.name)
model_deployers
special
Initialization of the BentoML Model Deployer.
bentoml_model_deployer
Implementation of the BentoML Model Deployer.
BentoMLModelDeployer (BaseModelDeployer)
BentoML model deployer stack component implementation.
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
class BentoMLModelDeployer(BaseModelDeployer):
"""BentoML model deployer stack component implementation."""
NAME: ClassVar[str] = "BentoML"
FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = BentoMLModelDeployerFlavor
_service_path: Optional[str] = None
@property
def config(self) -> BentoMLModelDeployerConfig:
"""Returns the `BentoMLModelDeployerConfig` config.
Returns:
The configuration.
"""
return cast(BentoMLModelDeployerConfig, self._config)
@staticmethod
def get_service_path(id_: UUID) -> str:
"""Get the path where local BentoML service information is stored.
This includes the deployment service configuration, PID and log files
are stored.
Args:
id_: The ID of the BentoML model deployer.
Returns:
The service path.
"""
service_path = os.path.join(
GlobalConfiguration().local_stores_path,
str(id_),
)
create_dir_recursive_if_not_exists(service_path)
return service_path
@property
def local_path(self) -> str:
"""Returns the path to the root directory.
This is where all configurations for BentoML deployment daemon processes
are stored.
If the service path is not set in the config by the user, the path is
set to a local default path according to the component ID.
Returns:
The path to the local service root directory.
"""
if self._service_path is not None:
return self._service_path
if self.config.service_path:
self._service_path = self.config.service_path
else:
self._service_path = self.get_service_path(self.id)
create_dir_recursive_if_not_exists(self._service_path)
return self._service_path
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "BentoMLDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information on the model server.
Args:
service_instance: BentoML deployment service object
Returns:
A dictionary containing the model server information.
"""
predictions_apis_urls = ""
if service_instance.prediction_apis_urls is not None:
predictions_apis_urls = ", ".join(
[
api
for api in service_instance.prediction_apis_urls
if api is not None
]
)
return {
"PREDICTION_URL": service_instance.prediction_url,
"BENTO_TAG": service_instance.config.bento,
"MODEL_NAME": service_instance.config.model_name,
"MODEL_URI": service_instance.config.model_uri,
"BENTO_URI": service_instance.config.bento_uri,
"SERVICE_PATH": service_instance.status.runtime_path,
"DAEMON_PID": str(service_instance.status.pid),
"PREDICITON_APIS_URLS": predictions_apis_urls,
}
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new BentoML deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new BentoML
deployment server to reflect the model and other configuration
parameters specified in the supplied BentoML service `config`.
* if `replace` is True, this method will first attempt to find an
existing BentoML deployment service that is *equivalent* to the
supplied configuration parameters. Two or more BentoML deployment
services are considered equivalent if they have the same
`pipeline_name`, `pipeline_step_name` and `model_name` configuration
parameters. To put it differently, two BentoML deployment services
are equivalent if they serve versions of the same model deployed by
the same pipeline step. If an equivalent BentoML deployment is found,
it will be updated in place to reflect the new configuration
parameters.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new BentoML deployment
server for each new model version. If multiple equivalent BentoML
deployment servers are found, one is selected at random to be updated
and the others are deleted.
Args:
config: the configuration of the model to be deployed with BentoML.
replace: set this flag to True to find and update an equivalent
BentoML deployment server with the new model instead of
creating and starting a new deployment server.
timeout: the timeout in seconds to wait for the BentoML server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the BentoML
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML BentoML deployment service object that can be used to
interact with the BentoML model http server.
"""
config = cast(BentoMLDeploymentConfig, config)
service = None
# if replace is True, remove all existing services
if replace is True:
existing_services = self.find_model_server(
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for existing_service in existing_services:
if service is None:
# keep the most recently created service
service = cast(BentoMLDeploymentService, existing_service)
try:
# delete the older services and don't wait for them to
# be deprovisioned
self._clean_up_existing_service(
existing_service=cast(
BentoMLDeploymentService, existing_service
),
timeout=timeout,
force=True,
)
except RuntimeError:
# ignore errors encountered while stopping old services
pass
if service:
logger.info(
f"Updating an existing BentoML deployment service: {service}"
)
# set the root runtime path with the stack component's UUID
config.root_runtime_path = self.local_path
service.stop(timeout=timeout, force=True)
service.update(config)
service.start(timeout=timeout)
else:
# create a new BentoMLDeploymentService instance
service = self._create_new_service(timeout, config)
logger.info(f"Created a new BentoML deployment service: {service}")
return cast(BaseService, service)
def _clean_up_existing_service(
self,
timeout: int,
force: bool,
existing_service: BentoMLDeploymentService,
) -> None:
# stop the older service
existing_service.stop(timeout=timeout, force=force)
# delete the old configuration file
if existing_service.status.runtime_path:
shutil.rmtree(existing_service.status.runtime_path)
# the step will receive a config from the user that mentions the number
# of workers etc.the step implementation will create a new config using
# all values from the user and add values like pipeline name, model_uri
def _create_new_service(
self, timeout: int, config: BentoMLDeploymentConfig
) -> BentoMLDeploymentService:
"""Creates a new BentoMLDeploymentService.
Args:
timeout: the timeout in seconds to wait for the BentoML http server
to be provisioned and successfully started or updated.
config: the configuration of the model to be deployed with BentoML.
Returns:
The BentoMLDeploymentService object that can be used to interact
with the BentoML model server.
"""
# set the root runtime path with the stack component's UUID
config.root_runtime_path = self.local_path
# create a new service for the new model
service = BentoMLDeploymentService(config)
service.start(timeout=timeout)
return service
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
model_type: Optional[str] = None,
) -> List[BaseService]:
"""Finds one or more model servers that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: Name of the pipeline that the deployed model was part
of.
pipeline_run_id: ID of the pipeline run which the deployed model
was part of.
pipeline_step_name: The name of the pipeline model deployment step
that deployed the model.
model_name: Name of the deployed model.
model_uri: URI of the deployed model.
model_type: Type/format of the deployed model. Not used in this
BentoML case.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
Raises:
TypeError: if any of the input arguments are of an invalid type.
"""
services = []
config = BentoMLDeploymentConfig(
model_name=model_name or "",
bento="",
port=BENTOML_DEFAULT_PORT,
model_uri=model_uri or "",
working_dir="",
pipeline_name=pipeline_name or "",
pipeline_run_id=pipeline_run_id or "",
pipeline_step_name=pipeline_step_name or "",
)
# find all services that match the input criteria
for root, _, files in os.walk(self.local_path):
if service_uuid and Path(root).name != str(service_uuid):
continue
for file in files:
if file == SERVICE_DAEMON_CONFIG_FILE_NAME:
service_config_path = os.path.join(root, file)
logger.debug(
"Loading service daemon configuration from %s",
service_config_path,
)
existing_service_config = None
with open(service_config_path, "r") as f:
existing_service_config = f.read()
existing_service = ServiceRegistry().load_service_from_json(
existing_service_config
)
if not isinstance(
existing_service, BentoMLDeploymentService
):
raise TypeError(
f"Expected service type BentoMLDeploymentService but got "
f"{type(existing_service)} instead"
)
existing_service.update_status()
if self._matches_search_criteria(existing_service, config):
if not running or existing_service.is_running:
services.append(cast(BaseService, existing_service))
return services
def _matches_search_criteria(
self,
existing_service: BentoMLDeploymentService,
config: BentoMLDeploymentConfig,
) -> bool:
"""Returns true if a service matches the input criteria.
If any of the values in the input criteria are None, they are ignored.
This allows listing services just by common pipeline names or step
names, etc.
Args:
existing_service: The materialized Service instance derived from
the config of the older (existing) service
config: The BentoMlDeploymentConfig object passed to the
deploy_model function holding parameters of the new service
to be created.
Returns:
True if the service matches the input criteria.
"""
existing_service_config = existing_service.config
# check if the existing service matches the input criteria
if (
(
not config.pipeline_name
or existing_service_config.pipeline_name == config.pipeline_name
)
and (
not config.model_name
or existing_service_config.model_name == config.model_name
)
and (
not config.pipeline_step_name
or existing_service_config.pipeline_step_name
== config.pipeline_step_name
)
and (
not config.pipeline_run_id
or existing_service_config.pipeline_run_id
== config.pipeline_run_id
)
):
return True
return False
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to stop a model server.
Args:
uuid: UUID of the model server to stop.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, stop it
if existing_services:
existing_services[0].stop(timeout=timeout, force=force)
def start_model_server(
self, uuid: UUID, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
) -> None:
"""Method to start a model server.
Args:
uuid: UUID of the model server to start.
timeout: Timeout in seconds to wait for the service to start.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, start it
if existing_services:
existing_services[0].start(timeout=timeout)
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to delete all configuration of a model server.
Args:
uuid: UUID of the model server to delete.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, clean it up
if existing_services:
service = cast(BentoMLDeploymentService, existing_services[0])
self._clean_up_existing_service(
existing_service=service, timeout=timeout, force=force
)
config: BentoMLModelDeployerConfig
property
readonly
Returns the BentoMLModelDeployerConfig
config.
Returns:
Type | Description |
---|---|
BentoMLModelDeployerConfig |
The configuration. |
local_path: str
property
readonly
Returns the path to the root directory.
This is where all configurations for BentoML deployment daemon processes are stored.
If the service path is not set in the config by the user, the path is set to a local default path according to the component ID.
Returns:
Type | Description |
---|---|
str |
The path to the local service root directory. |
FLAVOR (BaseModelDeployerFlavor)
Flavor for the BentoML model deployer.
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
class BentoMLModelDeployerFlavor(BaseModelDeployerFlavor):
"""Flavor for the BentoML model deployer."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
Name of the flavor.
"""
return BENTOML_MODEL_DEPLOYER_FLAVOR
@property
def config_class(self) -> Type[BentoMLModelDeployerConfig]:
"""Returns `BentoMLModelDeployerConfig` config class.
Returns:
The config class.
"""
return BentoMLModelDeployerConfig
@property
def implementation_class(self) -> Type["BentoMLModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.bentoml.model_deployers import (
BentoMLModelDeployer,
)
return BentoMLModelDeployer
config_class: Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig]
property
readonly
Returns BentoMLModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.bentoml.flavors.bentoml_model_deployer_flavor.BentoMLModelDeployerConfig] |
The config class. |
implementation_class: Type[BentoMLModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[BentoMLModelDeployer] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
Name of the flavor. |
delete_model_server(self, uuid, timeout=10, force=False)
Method to delete all configuration of a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to delete. |
required |
timeout |
int |
Timeout in seconds to wait for the service to stop. |
10 |
force |
bool |
If True, force the service to stop. |
False |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to delete all configuration of a model server.
Args:
uuid: UUID of the model server to delete.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, clean it up
if existing_services:
service = cast(BentoMLDeploymentService, existing_services[0])
self._clean_up_existing_service(
existing_service=service, timeout=timeout, force=force
)
deploy_model(self, config, replace=False, timeout=10)
Create a new BentoML deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the replace
argument value:
-
if
replace
is False, calling this method will create a new BentoML deployment server to reflect the model and other configuration parameters specified in the supplied BentoML serviceconfig
. -
if
replace
is True, this method will first attempt to find an existing BentoML deployment service that is equivalent to the supplied configuration parameters. Two or more BentoML deployment services are considered equivalent if they have the samepipeline_name
,pipeline_step_name
andmodel_name
configuration parameters. To put it differently, two BentoML deployment services are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent BentoML deployment is found, it will be updated in place to reflect the new configuration parameters.
Callers should set replace
to True if they want a continuous model
deployment workflow that doesn't spin up a new BentoML deployment
server for each new model version. If multiple equivalent BentoML
deployment servers are found, one is selected at random to be updated
and the others are deleted.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServiceConfig |
the configuration of the model to be deployed with BentoML. |
required |
replace |
bool |
set this flag to True to find and update an equivalent BentoML deployment server with the new model instead of creating and starting a new deployment server. |
False |
timeout |
int |
the timeout in seconds to wait for the BentoML server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the BentoML server is provisioned, without waiting for it to fully start. |
10 |
Returns:
Type | Description |
---|---|
BaseService |
The ZenML BentoML deployment service object that can be used to interact with the BentoML model http server. |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new BentoML deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new BentoML
deployment server to reflect the model and other configuration
parameters specified in the supplied BentoML service `config`.
* if `replace` is True, this method will first attempt to find an
existing BentoML deployment service that is *equivalent* to the
supplied configuration parameters. Two or more BentoML deployment
services are considered equivalent if they have the same
`pipeline_name`, `pipeline_step_name` and `model_name` configuration
parameters. To put it differently, two BentoML deployment services
are equivalent if they serve versions of the same model deployed by
the same pipeline step. If an equivalent BentoML deployment is found,
it will be updated in place to reflect the new configuration
parameters.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new BentoML deployment
server for each new model version. If multiple equivalent BentoML
deployment servers are found, one is selected at random to be updated
and the others are deleted.
Args:
config: the configuration of the model to be deployed with BentoML.
replace: set this flag to True to find and update an equivalent
BentoML deployment server with the new model instead of
creating and starting a new deployment server.
timeout: the timeout in seconds to wait for the BentoML server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the BentoML
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML BentoML deployment service object that can be used to
interact with the BentoML model http server.
"""
config = cast(BentoMLDeploymentConfig, config)
service = None
# if replace is True, remove all existing services
if replace is True:
existing_services = self.find_model_server(
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for existing_service in existing_services:
if service is None:
# keep the most recently created service
service = cast(BentoMLDeploymentService, existing_service)
try:
# delete the older services and don't wait for them to
# be deprovisioned
self._clean_up_existing_service(
existing_service=cast(
BentoMLDeploymentService, existing_service
),
timeout=timeout,
force=True,
)
except RuntimeError:
# ignore errors encountered while stopping old services
pass
if service:
logger.info(
f"Updating an existing BentoML deployment service: {service}"
)
# set the root runtime path with the stack component's UUID
config.root_runtime_path = self.local_path
service.stop(timeout=timeout, force=True)
service.update(config)
service.start(timeout=timeout)
else:
# create a new BentoMLDeploymentService instance
service = self._create_new_service(timeout, config)
logger.info(f"Created a new BentoML deployment service: {service}")
return cast(BaseService, service)
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, pipeline_run_id=None, pipeline_step_name=None, model_name=None, model_uri=None, model_type=None)
Finds one or more model servers that match the given criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
running |
bool |
If true, only running services will be returned. |
False |
service_uuid |
Optional[uuid.UUID] |
The UUID of the service that was originally used to deploy the model. |
None |
pipeline_name |
Optional[str] |
Name of the pipeline that the deployed model was part of. |
None |
pipeline_run_id |
Optional[str] |
ID of the pipeline run which the deployed model was part of. |
None |
pipeline_step_name |
Optional[str] |
The name of the pipeline model deployment step that deployed the model. |
None |
model_name |
Optional[str] |
Name of the deployed model. |
None |
model_uri |
Optional[str] |
URI of the deployed model. |
None |
model_type |
Optional[str] |
Type/format of the deployed model. Not used in this BentoML case. |
None |
Returns:
Type | Description |
---|---|
List[zenml.services.service.BaseService] |
One or more Service objects representing model servers that match the input search criteria. |
Exceptions:
Type | Description |
---|---|
TypeError |
if any of the input arguments are of an invalid type. |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
model_type: Optional[str] = None,
) -> List[BaseService]:
"""Finds one or more model servers that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: Name of the pipeline that the deployed model was part
of.
pipeline_run_id: ID of the pipeline run which the deployed model
was part of.
pipeline_step_name: The name of the pipeline model deployment step
that deployed the model.
model_name: Name of the deployed model.
model_uri: URI of the deployed model.
model_type: Type/format of the deployed model. Not used in this
BentoML case.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
Raises:
TypeError: if any of the input arguments are of an invalid type.
"""
services = []
config = BentoMLDeploymentConfig(
model_name=model_name or "",
bento="",
port=BENTOML_DEFAULT_PORT,
model_uri=model_uri or "",
working_dir="",
pipeline_name=pipeline_name or "",
pipeline_run_id=pipeline_run_id or "",
pipeline_step_name=pipeline_step_name or "",
)
# find all services that match the input criteria
for root, _, files in os.walk(self.local_path):
if service_uuid and Path(root).name != str(service_uuid):
continue
for file in files:
if file == SERVICE_DAEMON_CONFIG_FILE_NAME:
service_config_path = os.path.join(root, file)
logger.debug(
"Loading service daemon configuration from %s",
service_config_path,
)
existing_service_config = None
with open(service_config_path, "r") as f:
existing_service_config = f.read()
existing_service = ServiceRegistry().load_service_from_json(
existing_service_config
)
if not isinstance(
existing_service, BentoMLDeploymentService
):
raise TypeError(
f"Expected service type BentoMLDeploymentService but got "
f"{type(existing_service)} instead"
)
existing_service.update_status()
if self._matches_search_criteria(existing_service, config):
if not running or existing_service.is_running:
services.append(cast(BaseService, existing_service))
return services
get_model_server_info(service_instance)
staticmethod
Return implementation specific information on the model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_instance |
BentoMLDeploymentService |
BentoML deployment service object |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
A dictionary containing the model server information. |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "BentoMLDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information on the model server.
Args:
service_instance: BentoML deployment service object
Returns:
A dictionary containing the model server information.
"""
predictions_apis_urls = ""
if service_instance.prediction_apis_urls is not None:
predictions_apis_urls = ", ".join(
[
api
for api in service_instance.prediction_apis_urls
if api is not None
]
)
return {
"PREDICTION_URL": service_instance.prediction_url,
"BENTO_TAG": service_instance.config.bento,
"MODEL_NAME": service_instance.config.model_name,
"MODEL_URI": service_instance.config.model_uri,
"BENTO_URI": service_instance.config.bento_uri,
"SERVICE_PATH": service_instance.status.runtime_path,
"DAEMON_PID": str(service_instance.status.pid),
"PREDICITON_APIS_URLS": predictions_apis_urls,
}
get_service_path(id_)
staticmethod
Get the path where local BentoML service information is stored.
This includes the deployment service configuration, PID and log files are stored.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
UUID |
The ID of the BentoML model deployer. |
required |
Returns:
Type | Description |
---|---|
str |
The service path. |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
@staticmethod
def get_service_path(id_: UUID) -> str:
"""Get the path where local BentoML service information is stored.
This includes the deployment service configuration, PID and log files
are stored.
Args:
id_: The ID of the BentoML model deployer.
Returns:
The service path.
"""
service_path = os.path.join(
GlobalConfiguration().local_stores_path,
str(id_),
)
create_dir_recursive_if_not_exists(service_path)
return service_path
start_model_server(self, uuid, timeout=10)
Method to start a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to start. |
required |
timeout |
int |
Timeout in seconds to wait for the service to start. |
10 |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def start_model_server(
self, uuid: UUID, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
) -> None:
"""Method to start a model server.
Args:
uuid: UUID of the model server to start.
timeout: Timeout in seconds to wait for the service to start.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, start it
if existing_services:
existing_services[0].start(timeout=timeout)
stop_model_server(self, uuid, timeout=10, force=False)
Method to stop a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to stop. |
required |
timeout |
int |
Timeout in seconds to wait for the service to stop. |
10 |
force |
bool |
If True, force the service to stop. |
False |
Source code in zenml/integrations/bentoml/model_deployers/bentoml_model_deployer.py
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to stop a model server.
Args:
uuid: UUID of the model server to stop.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, stop it
if existing_services:
existing_services[0].stop(timeout=timeout, force=force)
services
special
Initialization for BentoML services.
bentoml_deployment
Implementation for the BentoML inference service.
BentoMLDeploymentConfig (LocalDaemonServiceConfig)
pydantic-model
BentoML model deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
str |
name of the model to deploy |
model_uri |
str |
URI of the model to deploy |
port |
Optional[int] |
port to expose the service on |
bento |
str |
Bento package to deploy |
workers |
Optional[int] |
number of workers to use |
backlog |
Optional[int] |
number of requests to queue |
production |
bool |
whether to run in production mode |
working_dir |
str |
working directory for the service |
host |
Optional[str] |
host to expose the service on |
ssl_parameters |
Optional[zenml.integrations.bentoml.services.bentoml_deployment.SSLBentoMLParametersConfig] |
SSL parameters for the Bentoml deployment |
Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class BentoMLDeploymentConfig(LocalDaemonServiceConfig):
"""BentoML model deployment configuration.
Attributes:
model_name: name of the model to deploy
model_uri: URI of the model to deploy
port: port to expose the service on
bento: Bento package to deploy
workers: number of workers to use
backlog: number of requests to queue
production: whether to run in production mode
working_dir: working directory for the service
host: host to expose the service on
ssl_parameters: SSL parameters for the Bentoml deployment
"""
model_name: str
model_uri: str
bento: str
bento_uri: Optional[str] = None
apis: List[str] = []
workers: Optional[int] = None
port: Optional[int] = None
backlog: Optional[int] = None
production: bool = False
working_dir: str
host: Optional[str] = None
ssl_parameters: Optional[SSLBentoMLParametersConfig] = Field(
default_factory=SSLBentoMLParametersConfig
)
BentoMLDeploymentEndpoint (LocalDaemonServiceEndpoint)
pydantic-model
A service endpoint exposed by the BentoML deployment daemon.
Attributes:
Name | Type | Description |
---|---|---|
config |
BentoMLDeploymentEndpointConfig |
service endpoint configuration |
Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class BentoMLDeploymentEndpoint(LocalDaemonServiceEndpoint):
"""A service endpoint exposed by the BentoML deployment daemon.
Attributes:
config: service endpoint configuration
"""
config: BentoMLDeploymentEndpointConfig
monitor: HTTPEndpointHealthMonitor
@property
def prediction_url(self) -> Optional[str]:
"""Gets the prediction URL for the endpoint.
Returns:
the prediction URL for the endpoint
"""
uri = self.status.uri
if not uri:
return None
return os.path.join(uri, self.config.prediction_url_path)
prediction_url: Optional[str]
property
readonly
Gets the prediction URL for the endpoint.
Returns:
Type | Description |
---|---|
Optional[str] |
the prediction URL for the endpoint |
BentoMLDeploymentEndpointConfig (LocalDaemonServiceEndpointConfig)
pydantic-model
BentoML deployment service configuration.
Attributes:
Name | Type | Description |
---|---|---|
prediction_url_path |
str |
URI subpath for prediction requests |
Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class BentoMLDeploymentEndpointConfig(LocalDaemonServiceEndpointConfig):
"""BentoML deployment service configuration.
Attributes:
prediction_url_path: URI subpath for prediction requests
"""
prediction_url_path: str
BentoMLDeploymentService (LocalDaemonService)
pydantic-model
BentoML deployment service used to start a local prediction server for BentoML models.
Attributes:
Name | Type | Description |
---|---|---|
SERVICE_TYPE |
ClassVar[zenml.services.service_type.ServiceType] |
a service type descriptor with information describing the BentoML deployment service class |
config |
BentoMLDeploymentConfig |
service configuration |
endpoint |
BentoMLDeploymentEndpoint |
optional service endpoint |
Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class BentoMLDeploymentService(LocalDaemonService):
"""BentoML deployment service used to start a local prediction server for BentoML models.
Attributes:
SERVICE_TYPE: a service type descriptor with information describing
the BentoML deployment service class
config: service configuration
endpoint: optional service endpoint
"""
SERVICE_TYPE = ServiceType(
name="bentoml-deployment",
type="model-serving",
flavor="bentoml",
description="BentoML prediction service",
)
config: BentoMLDeploymentConfig
endpoint: BentoMLDeploymentEndpoint
def __init__(
self,
config: Union[BentoMLDeploymentConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the BentoML deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-700]: implement a service factory or builder for BentoML
# deployment services
if (
isinstance(config, BentoMLDeploymentConfig)
and "endpoint" not in attrs
):
endpoint = BentoMLDeploymentEndpoint(
config=BentoMLDeploymentEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
port=config.port,
ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
prediction_url_path=BENTOML_PREDICTION_URL_PATH,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
def run(self) -> None:
"""Start the service."""
logger.info(
"Starting BentoML prediction service as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
# verify if to deploy in production mode or development mode
if self.config.production:
logger.info("Running in production mode.")
from bentoml.serve import serve_http_production
try:
serve_http_production(
self.config.bento,
port=self.endpoint.status.port,
api_workers=self.config.workers,
backlog=self.config.backlog,
host=self.endpoint.status.hostname,
working_dir=self.config.working_dir,
ssl_certfile=ssl_params.ssl_certfile,
ssl_keyfile=ssl_params.ssl_keyfile,
ssl_keyfile_password=ssl_params.ssl_keyfile_password,
ssl_version=ssl_params.ssl_version,
ssl_cert_reqs=ssl_params.ssl_cert_reqs,
ssl_ca_certs=ssl_params.ssl_ca_certs,
ssl_ciphers=ssl_params.ssl_ciphers,
)
except KeyboardInterrupt:
logger.info(
"BentoML prediction service stopped. Resuming normal execution."
)
else:
logger.info("Running in development mode.")
from bentoml.serve import serve_http_development
try:
serve_http_development(
self.config.bento,
port=self.endpoint.status.port,
working_dir=self.config.working_dir,
host=self.endpoint.status.hostname,
ssl_certfile=ssl_params.ssl_certfile,
ssl_keyfile=ssl_params.ssl_keyfile,
ssl_keyfile_password=ssl_params.ssl_keyfile_password,
ssl_version=ssl_params.ssl_version,
ssl_cert_reqs=ssl_params.ssl_cert_reqs,
ssl_ca_certs=ssl_params.ssl_ca_certs,
ssl_ciphers=ssl_params.ssl_ciphers,
)
except KeyboardInterrupt:
logger.info(
"BentoML prediction service stopped. Resuming normal execution."
)
@property
def prediction_url(self) -> Optional[str]:
"""Get the URI where the http server is running.
Returns:
The URI where the http service can be accessed to get more information
about the service and to make predictions.
"""
if not self.is_running:
return None
return self.endpoint.prediction_url
@property
def prediction_apis_urls(self) -> Optional[List[str]]:
"""Get the URI where the prediction api services is answering requests.
Returns:
The URI where the prediction service apis can be contacted to process
HTTP/REST inference requests, or None, if the service isn't running.
"""
if not self.is_running:
return None
if self.config.apis:
return [
f"{self.endpoint.prediction_url}/{api}"
for api in self.config.apis
]
return None
def predict(self, api_endpoint: str, data: "Any") -> "Any":
"""Make a prediction using the service.
Args:
data: data to make a prediction on
api_endpoint: the api endpoint to make the prediction on
Returns:
The prediction result.
Raises:
Exception: if the service is not running
ValueError: if the prediction endpoint is unknown.
"""
if not self.is_running:
raise Exception(
"BentoML prediction service is not running. "
"Please start the service before making predictions."
)
if self.endpoint.prediction_url is not None:
client = Client.from_url(self.endpoint.prediction_url)
result = client.call(api_endpoint, data)
else:
raise ValueError("No endpoint known for prediction.")
return result
prediction_apis_urls: Optional[List[str]]
property
readonly
Get the URI where the prediction api services is answering requests.
Returns:
Type | Description |
---|---|
Optional[List[str]] |
The URI where the prediction service apis can be contacted to process HTTP/REST inference requests, or None, if the service isn't running. |
prediction_url: Optional[str]
property
readonly
Get the URI where the http server is running.
Returns:
Type | Description |
---|---|
Optional[str] |
The URI where the http service can be accessed to get more information about the service and to make predictions. |
__init__(self, config, **attrs)
special
Initialize the BentoML deployment service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Union[zenml.integrations.bentoml.services.bentoml_deployment.BentoMLDeploymentConfig, Dict[str, Any]] |
service configuration |
required |
attrs |
Any |
additional attributes to set on the service |
{} |
Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
def __init__(
self,
config: Union[BentoMLDeploymentConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the BentoML deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-700]: implement a service factory or builder for BentoML
# deployment services
if (
isinstance(config, BentoMLDeploymentConfig)
and "endpoint" not in attrs
):
endpoint = BentoMLDeploymentEndpoint(
config=BentoMLDeploymentEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
port=config.port,
ip_address=config.host or DEFAULT_LOCAL_SERVICE_IP_ADDRESS,
prediction_url_path=BENTOML_PREDICTION_URL_PATH,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=BENTOML_HEALTHCHECK_URL_PATH,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
predict(self, api_endpoint, data)
Make a prediction using the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
data |
Any |
data to make a prediction on |
required |
api_endpoint |
str |
the api endpoint to make the prediction on |
required |
Returns:
Type | Description |
---|---|
Any |
The prediction result. |
Exceptions:
Type | Description |
---|---|
Exception |
if the service is not running |
ValueError |
if the prediction endpoint is unknown. |
Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
def predict(self, api_endpoint: str, data: "Any") -> "Any":
"""Make a prediction using the service.
Args:
data: data to make a prediction on
api_endpoint: the api endpoint to make the prediction on
Returns:
The prediction result.
Raises:
Exception: if the service is not running
ValueError: if the prediction endpoint is unknown.
"""
if not self.is_running:
raise Exception(
"BentoML prediction service is not running. "
"Please start the service before making predictions."
)
if self.endpoint.prediction_url is not None:
client = Client.from_url(self.endpoint.prediction_url)
result = client.call(api_endpoint, data)
else:
raise ValueError("No endpoint known for prediction.")
return result
run(self)
Start the service.
Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
def run(self) -> None:
"""Start the service."""
logger.info(
"Starting BentoML prediction service as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
ssl_params = self.config.ssl_parameters or SSLBentoMLParametersConfig()
# verify if to deploy in production mode or development mode
if self.config.production:
logger.info("Running in production mode.")
from bentoml.serve import serve_http_production
try:
serve_http_production(
self.config.bento,
port=self.endpoint.status.port,
api_workers=self.config.workers,
backlog=self.config.backlog,
host=self.endpoint.status.hostname,
working_dir=self.config.working_dir,
ssl_certfile=ssl_params.ssl_certfile,
ssl_keyfile=ssl_params.ssl_keyfile,
ssl_keyfile_password=ssl_params.ssl_keyfile_password,
ssl_version=ssl_params.ssl_version,
ssl_cert_reqs=ssl_params.ssl_cert_reqs,
ssl_ca_certs=ssl_params.ssl_ca_certs,
ssl_ciphers=ssl_params.ssl_ciphers,
)
except KeyboardInterrupt:
logger.info(
"BentoML prediction service stopped. Resuming normal execution."
)
else:
logger.info("Running in development mode.")
from bentoml.serve import serve_http_development
try:
serve_http_development(
self.config.bento,
port=self.endpoint.status.port,
working_dir=self.config.working_dir,
host=self.endpoint.status.hostname,
ssl_certfile=ssl_params.ssl_certfile,
ssl_keyfile=ssl_params.ssl_keyfile,
ssl_keyfile_password=ssl_params.ssl_keyfile_password,
ssl_version=ssl_params.ssl_version,
ssl_cert_reqs=ssl_params.ssl_cert_reqs,
ssl_ca_certs=ssl_params.ssl_ca_certs,
ssl_ciphers=ssl_params.ssl_ciphers,
)
except KeyboardInterrupt:
logger.info(
"BentoML prediction service stopped. Resuming normal execution."
)
SSLBentoMLParametersConfig (BaseModel)
pydantic-model
BentoML SSL parameters configuration.
Attributes:
Name | Type | Description |
---|---|---|
ssl_certfile |
Optional[str] |
SSL certificate file |
ssl_keyfile |
Optional[str] |
SSL key file |
ssl_keyfile_password |
Optional[str] |
SSL key file password |
ssl_version |
Optional[str] |
SSL version |
ssl_cert_reqs |
Optional[str] |
SSL certificate requirements |
ssl_ca_certs |
Optional[str] |
SSL CA certificates |
ssl_ciphers |
Optional[str] |
SSL ciphers |
Source code in zenml/integrations/bentoml/services/bentoml_deployment.py
class SSLBentoMLParametersConfig(BaseModel):
"""BentoML SSL parameters configuration.
Attributes:
ssl_certfile: SSL certificate file
ssl_keyfile: SSL key file
ssl_keyfile_password: SSL key file password
ssl_version: SSL version
ssl_cert_reqs: SSL certificate requirements
ssl_ca_certs: SSL CA certificates
ssl_ciphers: SSL ciphers
"""
ssl_certfile: Optional[str] = None
ssl_keyfile: Optional[str] = None
ssl_keyfile_password: Optional[str] = None
ssl_version: Optional[str] = None
ssl_cert_reqs: Optional[str] = None
ssl_ca_certs: Optional[str] = None
ssl_ciphers: Optional[str] = None
steps
special
Initialization of the BentoML standard interface steps.
bento_builder
Implementation of the BentoML bento builder step.
BentoMLBuilderParameters (BaseParameters)
pydantic-model
BentoML Bento builder step parameters.
Attributes:
Name | Type | Description |
---|---|---|
service |
str |
the name of the BentoML service to be deployed. |
model_name |
str |
the name of the model to be packaged. |
model_type |
str |
the type of the model. |
version |
Optional[str] |
the version of the model if given. |
labels |
Optional[Dict[str, str]] |
the labels of the model if given. |
description |
Optional[str] |
the description of the model if given. |
include |
Optional[List[str]] |
the files to be included in the BentoML bundle. |
exclude |
Optional[List[str]] |
the files to be excluded from the BentoML bundle. |
python |
Optional[Dict[str, str]] |
dictionary for configuring Bento's python dependencies, |
docker |
Optional[Dict[str, str]] |
dictionary for configuring Bento's docker image. |
Source code in zenml/integrations/bentoml/steps/bento_builder.py
class BentoMLBuilderParameters(BaseParameters):
"""BentoML Bento builder step parameters.
Attributes:
service: the name of the BentoML service to be deployed.
model_name: the name of the model to be packaged.
model_type: the type of the model.
version: the version of the model if given.
labels: the labels of the model if given.
description: the description of the model if given.
include: the files to be included in the BentoML bundle.
exclude: the files to be excluded from the BentoML bundle.
python: dictionary for configuring Bento's python dependencies,
docker: dictionary for configuring Bento's docker image.
"""
service: str
model_name: str
model_type: str
version: Optional[str] = None
labels: Optional[Dict[str, str]] = None
description: Optional[str] = None
include: Optional[List[str]] = None
exclude: Optional[List[str]] = None
python: Optional[Dict[str, str]] = None
docker: Optional[Dict[str, str]] = None
working_dir: Optional[str] = None
bento_builder_step (BaseStep)
Build a BentoML Model and Bento bundle.
This steps takes a model artifact of a trained or loaded ML model in a previous step and save it with BentoML, then build a BentoML bundle.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
the model to be packaged. |
required | |
params |
the parameters for the BentoML builder step. |
required | |
context |
the step context. |
required |
Returns:
Type | Description |
---|---|
the BentoML Bento object. |
PARAMETERS_CLASS (BaseParameters)
pydantic-model
BentoML Bento builder step parameters.
Attributes:
Name | Type | Description |
---|---|---|
service |
str |
the name of the BentoML service to be deployed. |
model_name |
str |
the name of the model to be packaged. |
model_type |
str |
the type of the model. |
version |
Optional[str] |
the version of the model if given. |
labels |
Optional[Dict[str, str]] |
the labels of the model if given. |
description |
Optional[str] |
the description of the model if given. |
include |
Optional[List[str]] |
the files to be included in the BentoML bundle. |
exclude |
Optional[List[str]] |
the files to be excluded from the BentoML bundle. |
python |
Optional[Dict[str, str]] |
dictionary for configuring Bento's python dependencies, |
docker |
Optional[Dict[str, str]] |
dictionary for configuring Bento's docker image. |
Source code in zenml/integrations/bentoml/steps/bento_builder.py
class BentoMLBuilderParameters(BaseParameters):
"""BentoML Bento builder step parameters.
Attributes:
service: the name of the BentoML service to be deployed.
model_name: the name of the model to be packaged.
model_type: the type of the model.
version: the version of the model if given.
labels: the labels of the model if given.
description: the description of the model if given.
include: the files to be included in the BentoML bundle.
exclude: the files to be excluded from the BentoML bundle.
python: dictionary for configuring Bento's python dependencies,
docker: dictionary for configuring Bento's docker image.
"""
service: str
model_name: str
model_type: str
version: Optional[str] = None
labels: Optional[Dict[str, str]] = None
description: Optional[str] = None
include: Optional[List[str]] = None
exclude: Optional[List[str]] = None
python: Optional[Dict[str, str]] = None
docker: Optional[Dict[str, str]] = None
working_dir: Optional[str] = None
entrypoint(model, params, context)
staticmethod
Build a BentoML Model and Bento bundle.
This steps takes a model artifact of a trained or loaded ML model in a previous step and save it with BentoML, then build a BentoML bundle.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
ModelArtifact |
the model to be packaged. |
required |
params |
BentoMLBuilderParameters |
the parameters for the BentoML builder step. |
required |
context |
StepContext |
the step context. |
required |
Returns:
Type | Description |
---|---|
Bento |
the BentoML Bento object. |
Source code in zenml/integrations/bentoml/steps/bento_builder.py
@step
def bento_builder_step(
model: ModelArtifact,
params: BentoMLBuilderParameters,
context: StepContext,
) -> bento.Bento:
"""Build a BentoML Model and Bento bundle.
This steps takes a model artifact of a trained or loaded ML model in a
previous step and save it with BentoML, then build a BentoML bundle.
Args:
model: the model to be packaged.
params: the parameters for the BentoML builder step.
context: the step context.
Returns:
the BentoML Bento object.
"""
# save the model and bento uri as part of the bento lables
labels = params.labels or {}
labels["model_uri"] = model.uri
labels["bento_uri"] = os.path.join(
context.get_output_artifact_uri(), DEFAULT_BENTO_FILENAME
)
# Load the model from the model artifact
model = model_from_model_artifact(model)
# Save the model to a BentoML model based on the model type
try:
module = importlib.import_module(f".{params.model_type}", "bentoml")
module.save_model(params.model_name, model, labels=params.labels)
except importlib.metadata.PackageNotFoundError:
bentoml.picklable_model.save_model(
params.model_name,
model,
)
# Build the BentoML bundle
bento = bentos.build(
service=params.service,
version=params.version,
labels=labels,
description=params.description,
include=params.include,
exclude=params.exclude,
python=params.python,
docker=params.docker,
build_ctx=params.working_dir or source_utils.get_source_root_path(),
)
# Return the BentoML Bento bundle
return bento
bentoml_deployer
Implementation of the BentoML model deployer pipeline step.
BentoMLDeployerParameters (BaseParameters)
pydantic-model
Model deployer step parameters for BentoML.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
str |
the name of the model to deploy. |
port |
int |
the port to use for the prediction service. |
workers |
Optional[int] |
number of workers to use for the prediction service |
backlog |
Optional[int] |
the number of requests to queue up before rejecting requests. |
production |
bool |
whether to deploy the service in production mode. |
working_dir |
Optional[str] |
the working directory to use for the prediction service. |
host |
Optional[str] |
the host to use for the prediction service. |
timeout |
int |
the number of seconds to wait for the service to start/stop. |
Source code in zenml/integrations/bentoml/steps/bentoml_deployer.py
class BentoMLDeployerParameters(BaseParameters):
"""Model deployer step parameters for BentoML.
Attributes:
model_name: the name of the model to deploy.
port: the port to use for the prediction service.
workers: number of workers to use for the prediction service
backlog: the number of requests to queue up before rejecting requests.
production: whether to deploy the service in production mode.
working_dir: the working directory to use for the prediction service.
host: the host to use for the prediction service.
timeout: the number of seconds to wait for the service to start/stop.
"""
model_name: str
port: int
workers: Optional[int] = None
backlog: Optional[int] = None
production: bool = False
working_dir: Optional[str] = None
host: Optional[str] = None
ssl_certfile: Optional[str] = None
ssl_keyfile: Optional[str] = None
ssl_keyfile_password: Optional[str] = None
ssl_version: Optional[str] = None
ssl_cert_reqs: Optional[str] = None
ssl_ca_certs: Optional[str] = None
ssl_ciphers: Optional[str] = None
timeout: int = 30
bentoml_model_deployer_step (BaseStep)
Model deployer pipeline step for BentoML.
This step deploys a given Bento to a local BentoML http prediction server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
whether to deploy the model or not |
required | |
params |
parameters for the deployer step |
required | |
bento |
the bento artifact to deploy |
required |
Returns:
Type | Description |
---|---|
BentoML deployment service |
PARAMETERS_CLASS (BaseParameters)
pydantic-model
Model deployer step parameters for BentoML.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
str |
the name of the model to deploy. |
port |
int |
the port to use for the prediction service. |
workers |
Optional[int] |
number of workers to use for the prediction service |
backlog |
Optional[int] |
the number of requests to queue up before rejecting requests. |
production |
bool |
whether to deploy the service in production mode. |
working_dir |
Optional[str] |
the working directory to use for the prediction service. |
host |
Optional[str] |
the host to use for the prediction service. |
timeout |
int |
the number of seconds to wait for the service to start/stop. |
Source code in zenml/integrations/bentoml/steps/bentoml_deployer.py
class BentoMLDeployerParameters(BaseParameters):
"""Model deployer step parameters for BentoML.
Attributes:
model_name: the name of the model to deploy.
port: the port to use for the prediction service.
workers: number of workers to use for the prediction service
backlog: the number of requests to queue up before rejecting requests.
production: whether to deploy the service in production mode.
working_dir: the working directory to use for the prediction service.
host: the host to use for the prediction service.
timeout: the number of seconds to wait for the service to start/stop.
"""
model_name: str
port: int
workers: Optional[int] = None
backlog: Optional[int] = None
production: bool = False
working_dir: Optional[str] = None
host: Optional[str] = None
ssl_certfile: Optional[str] = None
ssl_keyfile: Optional[str] = None
ssl_keyfile_password: Optional[str] = None
ssl_version: Optional[str] = None
ssl_cert_reqs: Optional[str] = None
ssl_ca_certs: Optional[str] = None
ssl_ciphers: Optional[str] = None
timeout: int = 30
entrypoint(deploy_decision, bento, params)
staticmethod
Model deployer pipeline step for BentoML.
This step deploys a given Bento to a local BentoML http prediction server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
bool |
whether to deploy the model or not |
required |
params |
BentoMLDeployerParameters |
parameters for the deployer step |
required |
bento |
Bento |
the bento artifact to deploy |
required |
Returns:
Type | Description |
---|---|
BentoMLDeploymentService |
BentoML deployment service |
Source code in zenml/integrations/bentoml/steps/bentoml_deployer.py
@step(enable_cache=True)
def bentoml_model_deployer_step(
deploy_decision: bool,
bento: bento.Bento,
params: BentoMLDeployerParameters,
) -> BentoMLDeploymentService:
"""Model deployer pipeline step for BentoML.
This step deploys a given Bento to a local BentoML http prediction server.
Args:
deploy_decision: whether to deploy the model or not
params: parameters for the deployer step
bento: the bento artifact to deploy
Returns:
BentoML deployment service
"""
# get the current active model deployer
model_deployer = cast(
BentoMLModelDeployer, BentoMLModelDeployer.get_active_model_deployer()
)
# get pipeline name, step name and run id
step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
pipeline_name = step_env.pipeline_name
run_id = step_env.pipeline_run_id
step_name = step_env.step_name
# fetch existing services with same pipeline name, step name and model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=step_name,
model_name=params.model_name,
)
# Return the apis endpoint of the defined service to use in the predict.
# This is a workaround to get the endpoints of the service defined as functions
# from the user code in the BentoML service.
def service_apis(bento_tag: str) -> List[str]:
# Add working dir in the bentoml load
service = bentoml.load(
bento_identifier=bento_tag,
working_dir=params.working_dir
or source_utils.get_source_root_path(),
)
apis = service.apis
apis_paths = list(apis.keys())
return apis_paths
# create a config for the new model service
predictor_cfg = BentoMLDeploymentConfig(
model_name=params.model_name,
bento=str(bento.tag),
model_uri=bento.info.labels.get("model_uri"),
bento_uri=bento.info.labels.get("bento_uri"),
apis=service_apis(str(bento.tag)),
workers=params.workers,
working_dir=params.working_dir or source_utils.get_source_root_path(),
port=params.port,
pipeline_name=pipeline_name,
pipeline_run_id=run_id,
pipeline_step_name=step_name,
ssl_parameters=SSLBentoMLParametersConfig(
ssl_certfile=params.ssl_certfile,
ssl_keyfile=params.ssl_keyfile,
ssl_keyfile_password=params.ssl_keyfile_password,
ssl_version=params.ssl_version,
ssl_cert_reqs=params.ssl_cert_reqs,
ssl_ca_certs=params.ssl_ca_certs,
ssl_ciphers=params.ssl_ciphers,
),
)
# Creating a new service with inactive state and status by default
service = BentoMLDeploymentService(predictor_cfg)
if existing_services:
service = cast(BentoMLDeploymentService, existing_services[0])
if not deploy_decision and existing_services:
logger.info(
f"Skipping model deployment because the model quality does not "
f"meet the criteria. Reusing last model server deployed by step "
f"'{step_name}' and pipeline '{pipeline_name}' for model "
f"'{params.model_name}'..."
)
if not service.is_running:
service.start(timeout=params.timeout)
return service
# create a new model deployment and replace an old one if it exists
new_service = cast(
BentoMLDeploymentService,
model_deployer.deploy_model(
replace=True,
config=predictor_cfg,
timeout=params.timeout,
),
)
logger.info(
f"BentoML deployment service started and reachable at:\n"
f" {new_service.prediction_url}\n"
)
return new_service