Mlflow
zenml.integrations.mlflow
special
Initialization for the ZenML MLflow integration.
The MLflow integrations currently enables you to use MLflow tracking as a convenient way to visualize your experiment runs within the MLflow UI.
MlflowIntegration (Integration)
Definition of MLflow integration for ZenML.
Source code in zenml/integrations/mlflow/__init__.py
class MlflowIntegration(Integration):
"""Definition of MLflow integration for ZenML."""
NAME = MLFLOW
REQUIREMENTS = [
"mlflow>=1.2.0,<1.26.0",
"mlserver>=0.5.3",
"mlserver-mlflow>=0.5.3",
]
@classmethod
def activate(cls) -> None:
"""Activate the MLflow integration."""
from zenml.integrations.mlflow import services # noqa
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the MLflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.mlflow.flavors import (
MLFlowExperimentTrackerFlavor,
MLFlowModelDeployerFlavor,
)
return [MLFlowModelDeployerFlavor, MLFlowExperimentTrackerFlavor]
activate()
classmethod
Activate the MLflow integration.
Source code in zenml/integrations/mlflow/__init__.py
@classmethod
def activate(cls) -> None:
"""Activate the MLflow integration."""
from zenml.integrations.mlflow import services # noqa
flavors()
classmethod
Declare the stack component flavors for the MLflow integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/mlflow/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the MLflow integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.mlflow.flavors import (
MLFlowExperimentTrackerFlavor,
MLFlowModelDeployerFlavor,
)
return [MLFlowModelDeployerFlavor, MLFlowExperimentTrackerFlavor]
experiment_trackers
special
Initialization of the MLflow experiment tracker.
mlflow_experiment_tracker
Implementation of the MLflow experiment tracker for ZenML.
MLFlowExperimentTracker (BaseExperimentTracker)
Track experiments using MLflow.
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
class MLFlowExperimentTracker(BaseExperimentTracker):
"""Track experiments using MLflow."""
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the experiment tracker and validate the tracking uri.
Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
super().__init__(*args, **kwargs)
self._ensure_valid_tracking_uri()
def _ensure_valid_tracking_uri(self) -> None:
"""Ensures that the tracking uri is a valid mlflow tracking uri.
Raises:
ValueError: If the tracking uri is not valid.
"""
tracking_uri = self.config.tracking_uri
if tracking_uri:
valid_schemes = DATABASE_ENGINES + ["http", "https", "file"]
if not any(
tracking_uri.startswith(scheme) for scheme in valid_schemes
) and not is_databricks_tracking_uri(tracking_uri):
raise ValueError(
f"MLflow tracking uri does not start with one of the valid "
f"schemes {valid_schemes} or its value is not set to "
f"'databricks'. See "
f"https://www.mlflow.org/docs/latest/tracking.html#where-runs-are-recorded "
f"for more information."
)
@property
def config(self) -> MLFlowExperimentTrackerConfig:
"""Returns the `MLFlowExperimentTrackerConfig` config.
Returns:
The configuration.
"""
return cast(MLFlowExperimentTrackerConfig, self._config)
@property
def local_path(self) -> Optional[str]:
"""Path to the local directory where the MLflow artifacts are stored.
Returns:
None if configured with a remote tracking URI, otherwise the
path to the local MLflow artifact store directory.
"""
tracking_uri = self.get_tracking_uri()
if is_remote_mlflow_tracking_uri(tracking_uri):
return None
else:
assert tracking_uri.startswith("file:")
return tracking_uri[5:]
@property
def validator(self) -> Optional["StackValidator"]:
"""Checks the stack has a `LocalArtifactStore` if no tracking uri was specified.
Returns:
An optional `StackValidator`.
"""
if self.config.tracking_uri:
# user specified a tracking uri, do nothing
return None
else:
# try to fall back to a tracking uri inside the zenml artifact
# store. this only works in case of a local artifact store, so we
# make sure to prevent stack with other artifact stores for now
return StackValidator(
custom_validation_function=lambda stack: (
isinstance(stack.artifact_store, LocalArtifactStore),
"MLflow experiment tracker without a specified tracking "
"uri only works with a local artifact store.",
)
)
@property
def settings_class(self) -> Optional[Type["BaseSettings"]]:
"""Settings class for the Mlflow experiment tracker.
Returns:
The settings class.
"""
return MLFlowExperimentTrackerSettings
@staticmethod
def _local_mlflow_backend() -> str:
"""Gets the local MLflow backend inside the ZenML artifact repository directory.
Returns:
The MLflow tracking URI for the local MLflow backend.
"""
client = Client()
artifact_store = client.active_stack.artifact_store
local_mlflow_backend_uri = os.path.join(artifact_store.path, "mlruns")
if not os.path.exists(local_mlflow_backend_uri):
os.makedirs(local_mlflow_backend_uri)
return "file:" + local_mlflow_backend_uri
def get_tracking_uri(self) -> str:
"""Returns the configured tracking URI or a local fallback.
Returns:
The tracking URI.
"""
return self.config.tracking_uri or self._local_mlflow_backend()
def prepare_step_run(self, info: "StepRunInfo") -> None:
"""Sets the MLflow tracking uri and credentials.
Args:
info: Info about the step that will be executed.
"""
self.configure_mlflow()
settings = cast(
MLFlowExperimentTrackerSettings,
self.get_settings(info),
)
experiment_name = settings.experiment_name or info.pipeline.name
experiment = self._set_active_experiment(experiment_name)
run_id = self.get_run_id(
experiment_name=experiment_name, run_name=info.run_name
)
tags = settings.tags.copy()
tags.update(self._get_internal_tags())
mlflow.start_run(
run_id=run_id,
run_name=info.run_name,
experiment_id=experiment.experiment_id,
tags=tags,
)
if settings.nested:
mlflow.start_run(run_name=info.config.name, nested=True, tags=tags)
def cleanup_step_run(self, info: "StepRunInfo") -> None:
"""Stops active MLflow runs and resets the MLflow tracking uri.
Args:
info: Info about the step that was executed.
"""
mlflow_utils.stop_zenml_mlflow_runs()
mlflow.set_tracking_uri("")
def configure_mlflow(self) -> None:
"""Configures the MLflow tracking URI and any additional credentials."""
tracking_uri = self.get_tracking_uri()
mlflow.set_tracking_uri(tracking_uri)
if is_databricks_tracking_uri(tracking_uri):
if self.config.databricks_host:
os.environ[DATABRICKS_HOST] = self.config.databricks_host
if self.config.tracking_username:
os.environ[DATABRICKS_USERNAME] = self.config.tracking_username
if self.config.tracking_password:
os.environ[DATABRICKS_PASSWORD] = self.config.tracking_password
if self.config.tracking_token:
os.environ[DATABRICKS_TOKEN] = self.config.tracking_token
else:
if self.config.tracking_username:
os.environ[
MLFLOW_TRACKING_USERNAME
] = self.config.tracking_username
if self.config.tracking_password:
os.environ[
MLFLOW_TRACKING_PASSWORD
] = self.config.tracking_password
if self.config.tracking_token:
os.environ[MLFLOW_TRACKING_TOKEN] = self.config.tracking_token
os.environ[MLFLOW_TRACKING_INSECURE_TLS] = (
"true" if self.config.tracking_insecure_tls else "false"
)
def get_run_id(self, experiment_name: str, run_name: str) -> Optional[str]:
"""Gets the if of a run with the given name and experiment.
Args:
experiment_name: Name of the experiment in which to search for the
run.
run_name: Name of the run to search.
Returns:
The id of the run if it exists.
"""
self.configure_mlflow()
experiment_name = self._adjust_experiment_name(experiment_name)
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string=f'tags.mlflow.runName = "{run_name}"',
output_format="list",
)
if not runs:
return None
run: Run = runs[0]
if mlflow_utils.is_zenml_run(run):
return cast(str, run.info.run_id)
else:
return None
def _set_active_experiment(self, experiment_name: str) -> Experiment:
"""Sets the active MLflow experiment.
If no experiment with this name exists, it is created and then
activated.
Args:
experiment_name: Name of the experiment to activate.
Raises:
RuntimeError: If the experiment creation or activation failed.
Returns:
The experiment.
"""
experiment_name = self._adjust_experiment_name(experiment_name)
mlflow.set_experiment(experiment_name=experiment_name)
experiment = mlflow.get_experiment_by_name(experiment_name)
if not experiment:
raise RuntimeError("Failed to set active mlflow experiment.")
return experiment
def _adjust_experiment_name(self, experiment_name: str) -> str:
"""Prepends a slash to the experiment name if using Databricks.
Databricks requires the experiment name to be an absolute path within
the Databricks workspace.
Args:
experiment_name: The experiment name.
Returns:
The potentially adjusted experiment name.
"""
tracking_uri = self.get_tracking_uri()
if (
tracking_uri
and is_databricks_tracking_uri(tracking_uri)
and not experiment_name.startswith("/")
):
return f"/{experiment_name}"
else:
return experiment_name
@staticmethod
def _get_internal_tags() -> Dict[str, Any]:
"""Gets ZenML internal tags for MLflow runs.
Returns:
Internal tags.
"""
return {mlflow_utils.ZENML_TAG_KEY: zenml.__version__}
config: MLFlowExperimentTrackerConfig
property
readonly
Returns the MLFlowExperimentTrackerConfig
config.
Returns:
Type | Description |
---|---|
MLFlowExperimentTrackerConfig |
The configuration. |
local_path: Optional[str]
property
readonly
Path to the local directory where the MLflow artifacts are stored.
Returns:
Type | Description |
---|---|
Optional[str] |
None if configured with a remote tracking URI, otherwise the path to the local MLflow artifact store directory. |
settings_class: Optional[Type[BaseSettings]]
property
readonly
Settings class for the Mlflow experiment tracker.
Returns:
Type | Description |
---|---|
Optional[Type[BaseSettings]] |
The settings class. |
validator: Optional[StackValidator]
property
readonly
Checks the stack has a LocalArtifactStore
if no tracking uri was specified.
Returns:
Type | Description |
---|---|
Optional[StackValidator] |
An optional |
__init__(self, *args, **kwargs)
special
Initialize the experiment tracker and validate the tracking uri.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Variable length argument list. |
() |
**kwargs |
Any |
Arbitrary keyword arguments. |
{} |
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize the experiment tracker and validate the tracking uri.
Args:
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
super().__init__(*args, **kwargs)
self._ensure_valid_tracking_uri()
cleanup_step_run(self, info)
Stops active MLflow runs and resets the MLflow tracking uri.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Info about the step that was executed. |
required |
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def cleanup_step_run(self, info: "StepRunInfo") -> None:
"""Stops active MLflow runs and resets the MLflow tracking uri.
Args:
info: Info about the step that was executed.
"""
mlflow_utils.stop_zenml_mlflow_runs()
mlflow.set_tracking_uri("")
configure_mlflow(self)
Configures the MLflow tracking URI and any additional credentials.
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def configure_mlflow(self) -> None:
"""Configures the MLflow tracking URI and any additional credentials."""
tracking_uri = self.get_tracking_uri()
mlflow.set_tracking_uri(tracking_uri)
if is_databricks_tracking_uri(tracking_uri):
if self.config.databricks_host:
os.environ[DATABRICKS_HOST] = self.config.databricks_host
if self.config.tracking_username:
os.environ[DATABRICKS_USERNAME] = self.config.tracking_username
if self.config.tracking_password:
os.environ[DATABRICKS_PASSWORD] = self.config.tracking_password
if self.config.tracking_token:
os.environ[DATABRICKS_TOKEN] = self.config.tracking_token
else:
if self.config.tracking_username:
os.environ[
MLFLOW_TRACKING_USERNAME
] = self.config.tracking_username
if self.config.tracking_password:
os.environ[
MLFLOW_TRACKING_PASSWORD
] = self.config.tracking_password
if self.config.tracking_token:
os.environ[MLFLOW_TRACKING_TOKEN] = self.config.tracking_token
os.environ[MLFLOW_TRACKING_INSECURE_TLS] = (
"true" if self.config.tracking_insecure_tls else "false"
)
get_run_id(self, experiment_name, run_name)
Gets the if of a run with the given name and experiment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
experiment_name |
str |
Name of the experiment in which to search for the run. |
required |
run_name |
str |
Name of the run to search. |
required |
Returns:
Type | Description |
---|---|
Optional[str] |
The id of the run if it exists. |
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def get_run_id(self, experiment_name: str, run_name: str) -> Optional[str]:
"""Gets the if of a run with the given name and experiment.
Args:
experiment_name: Name of the experiment in which to search for the
run.
run_name: Name of the run to search.
Returns:
The id of the run if it exists.
"""
self.configure_mlflow()
experiment_name = self._adjust_experiment_name(experiment_name)
runs = mlflow.search_runs(
experiment_names=[experiment_name],
filter_string=f'tags.mlflow.runName = "{run_name}"',
output_format="list",
)
if not runs:
return None
run: Run = runs[0]
if mlflow_utils.is_zenml_run(run):
return cast(str, run.info.run_id)
else:
return None
get_tracking_uri(self)
Returns the configured tracking URI or a local fallback.
Returns:
Type | Description |
---|---|
str |
The tracking URI. |
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def get_tracking_uri(self) -> str:
"""Returns the configured tracking URI or a local fallback.
Returns:
The tracking URI.
"""
return self.config.tracking_uri or self._local_mlflow_backend()
prepare_step_run(self, info)
Sets the MLflow tracking uri and credentials.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
info |
StepRunInfo |
Info about the step that will be executed. |
required |
Source code in zenml/integrations/mlflow/experiment_trackers/mlflow_experiment_tracker.py
def prepare_step_run(self, info: "StepRunInfo") -> None:
"""Sets the MLflow tracking uri and credentials.
Args:
info: Info about the step that will be executed.
"""
self.configure_mlflow()
settings = cast(
MLFlowExperimentTrackerSettings,
self.get_settings(info),
)
experiment_name = settings.experiment_name or info.pipeline.name
experiment = self._set_active_experiment(experiment_name)
run_id = self.get_run_id(
experiment_name=experiment_name, run_name=info.run_name
)
tags = settings.tags.copy()
tags.update(self._get_internal_tags())
mlflow.start_run(
run_id=run_id,
run_name=info.run_name,
experiment_id=experiment.experiment_id,
tags=tags,
)
if settings.nested:
mlflow.start_run(run_name=info.config.name, nested=True, tags=tags)
flavors
special
MLFlow integration flavors.
mlflow_experiment_tracker_flavor
MLFlow experiment tracker flavor.
MLFlowExperimentTrackerConfig (BaseExperimentTrackerConfig, MLFlowExperimentTrackerSettings)
pydantic-model
Config for the MLflow experiment tracker.
Attributes:
Name | Type | Description |
---|---|---|
tracking_uri |
Optional[str] |
The uri of the mlflow tracking server. If no uri is set,
your stack must contain a |
tracking_username |
Optional[str] |
Username for authenticating with the MLflow
tracking server. When a remote tracking uri is specified,
either |
tracking_password |
Optional[str] |
Password for authenticating with the MLflow
tracking server. When a remote tracking uri is specified,
either |
tracking_token |
Optional[str] |
Token for authenticating with the MLflow
tracking server. When a remote tracking uri is specified,
either |
tracking_insecure_tls |
bool |
Skips verification of TLS connection to the
MLflow tracking server if set to |
databricks_host |
Optional[str] |
The host of the Databricks workspace with the MLflow
managed server to connect to. This is only required if
|
Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
class MLFlowExperimentTrackerConfig( # type: ignore[misc] # https://github.com/pydantic/pydantic/issues/4173
BaseExperimentTrackerConfig, MLFlowExperimentTrackerSettings
):
"""Config for the MLflow experiment tracker.
Attributes:
tracking_uri: The uri of the mlflow tracking server. If no uri is set,
your stack must contain a `LocalArtifactStore` and ZenML will
point MLflow to a subdirectory of your artifact store instead.
tracking_username: Username for authenticating with the MLflow
tracking server. When a remote tracking uri is specified,
either `tracking_token` or `tracking_username` and
`tracking_password` must be specified.
tracking_password: Password for authenticating with the MLflow
tracking server. When a remote tracking uri is specified,
either `tracking_token` or `tracking_username` and
`tracking_password` must be specified.
tracking_token: Token for authenticating with the MLflow
tracking server. When a remote tracking uri is specified,
either `tracking_token` or `tracking_username` and
`tracking_password` must be specified.
tracking_insecure_tls: Skips verification of TLS connection to the
MLflow tracking server if set to `True`.
databricks_host: The host of the Databricks workspace with the MLflow
managed server to connect to. This is only required if
`tracking_uri` value is set to `"databricks"`.
"""
tracking_uri: Optional[str] = None
tracking_username: Optional[str] = SecretField()
tracking_password: Optional[str] = SecretField()
tracking_token: Optional[str] = SecretField()
tracking_insecure_tls: bool = False
databricks_host: Optional[str] = None
@root_validator(skip_on_failure=True)
def _ensure_authentication_if_necessary(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
"""Ensures that credentials or a token for authentication exist.
We make this check when running MLflow tracking with a remote backend.
Args:
values: The values to validate.
Returns:
The validated values.
Raises:
ValueError: If neither credentials nor a token are provided.
"""
tracking_uri = values.get("tracking_uri")
if tracking_uri:
if is_databricks_tracking_uri(tracking_uri):
# If the tracking uri is "databricks", then we need the databricks
# host to be set.
databricks_host = values.get("databricks_host")
if not databricks_host:
raise ValueError(
"MLflow experiment tracking with a Databricks MLflow "
"managed tracking server requires the `databricks_host` "
"to be set in your stack component. To update your "
"component, run `zenml experiment-tracker update "
"<NAME> --databricks_host=DATABRICKS_HOST` "
"and specify the hostname of your Databricks workspace."
)
if is_remote_mlflow_tracking_uri(tracking_uri):
# we need either username + password or a token to authenticate to
# the remote backend
basic_auth = values.get("tracking_username") and values.get(
"tracking_password"
)
token_auth = values.get("tracking_token")
if not (basic_auth or token_auth):
raise ValueError(
f"MLflow experiment tracking with a remote backend "
f"{tracking_uri} is only possible when specifying either "
f"username and password or an authentication token in your "
f"stack component. To update your component, run the "
f"following command: `zenml experiment-tracker update "
f"<NAME> --tracking_username=MY_USERNAME "
f"--tracking_password=MY_PASSWORD "
f"--tracking_token=MY_TOKEN` and specify either your "
f"username and password or token."
)
return values
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
This designation is used to determine if the stack component can be
shared with other users or if it is only usable on the local host.
Returns:
True if this config is for a local component, False otherwise.
"""
if not self.tracking_uri or not is_remote_mlflow_tracking_uri(
self.tracking_uri
):
return True
return False
is_local: bool
property
readonly
Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
MLFlowExperimentTrackerFlavor (BaseExperimentTrackerFlavor)
Class for the MLFlowExperimentTrackerFlavor
.
Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
class MLFlowExperimentTrackerFlavor(BaseExperimentTrackerFlavor):
"""Class for the `MLFlowExperimentTrackerFlavor`."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return MLFLOW_MODEL_EXPERIMENT_TRACKER_FLAVOR
@property
def config_class(self) -> Type[MLFlowExperimentTrackerConfig]:
"""Returns `MLFlowExperimentTrackerConfig` config class.
Returns:
The config class.
"""
return MLFlowExperimentTrackerConfig
@property
def implementation_class(self) -> Type["MLFlowExperimentTracker"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.mlflow.experiment_trackers import (
MLFlowExperimentTracker,
)
return MLFlowExperimentTracker
config_class: Type[zenml.integrations.mlflow.flavors.mlflow_experiment_tracker_flavor.MLFlowExperimentTrackerConfig]
property
readonly
Returns MLFlowExperimentTrackerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.mlflow.flavors.mlflow_experiment_tracker_flavor.MLFlowExperimentTrackerConfig] |
The config class. |
implementation_class: Type[MLFlowExperimentTracker]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[MLFlowExperimentTracker] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
MLFlowExperimentTrackerSettings (BaseSettings)
pydantic-model
Settings for the MLflow experiment tracker.
Attributes:
Name | Type | Description |
---|---|---|
experiment_name |
Optional[str] |
The MLflow experiment name. |
nested |
bool |
If |
tags |
Dict[str, Any] |
Tags for the Mlflow run. |
Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
class MLFlowExperimentTrackerSettings(BaseSettings):
"""Settings for the MLflow experiment tracker.
Attributes:
experiment_name: The MLflow experiment name.
nested: If `True`, will create a nested sub-run for the step.
tags: Tags for the Mlflow run.
"""
experiment_name: Optional[str] = None
nested: bool = False
tags: Dict[str, Any] = {}
is_databricks_tracking_uri(tracking_uri)
Checks whether the given tracking uri is a Databricks tracking uri.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tracking_uri |
str |
The tracking uri to check. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
def is_databricks_tracking_uri(tracking_uri: str) -> bool:
"""Checks whether the given tracking uri is a Databricks tracking uri.
Args:
tracking_uri: The tracking uri to check.
Returns:
`True` if the tracking uri is a Databricks tracking uri, `False`
otherwise.
"""
return tracking_uri == "databricks"
is_remote_mlflow_tracking_uri(tracking_uri)
Checks whether the given tracking uri is remote or not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
tracking_uri |
str |
The tracking uri to check. |
required |
Returns:
Type | Description |
---|---|
bool |
|
Source code in zenml/integrations/mlflow/flavors/mlflow_experiment_tracker_flavor.py
def is_remote_mlflow_tracking_uri(tracking_uri: str) -> bool:
"""Checks whether the given tracking uri is remote or not.
Args:
tracking_uri: The tracking uri to check.
Returns:
`True` if the tracking uri is remote, `False` otherwise.
"""
return any(
tracking_uri.startswith(prefix) for prefix in ["http://", "https://"]
) or is_databricks_tracking_uri(tracking_uri)
mlflow_model_deployer_flavor
MLFlow model deployer flavor.
MLFlowModelDeployerConfig (BaseModelDeployerConfig)
pydantic-model
Configuration for the MLflow model deployer.
Attributes:
Name | Type | Description |
---|---|---|
service_path |
str |
the path where the local MLflow deployment service configuration, PID and log files are stored. |
Source code in zenml/integrations/mlflow/flavors/mlflow_model_deployer_flavor.py
class MLFlowModelDeployerConfig(BaseModelDeployerConfig):
"""Configuration for the MLflow model deployer.
Attributes:
service_path: the path where the local MLflow deployment service
configuration, PID and log files are stored.
"""
service_path: str = ""
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
This designation is used to determine if the stack component can be
shared with other users or if it is only usable on the local host.
Returns:
True if this config is for a local component, False otherwise.
"""
return True
is_local: bool
property
readonly
Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
MLFlowModelDeployerFlavor (BaseModelDeployerFlavor)
Model deployer flavor for MLFlow models.
Source code in zenml/integrations/mlflow/flavors/mlflow_model_deployer_flavor.py
class MLFlowModelDeployerFlavor(BaseModelDeployerFlavor):
"""Model deployer flavor for MLFlow models."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return MLFLOW_MODEL_DEPLOYER_FLAVOR
@property
def config_class(self) -> Type[MLFlowModelDeployerConfig]:
"""Returns `MLFlowModelDeployerConfig` config class.
Returns:
The config class.
"""
return MLFlowModelDeployerConfig
@property
def implementation_class(self) -> Type["MLFlowModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.mlflow.model_deployers import (
MLFlowModelDeployer,
)
return MLFlowModelDeployer
config_class: Type[zenml.integrations.mlflow.flavors.mlflow_model_deployer_flavor.MLFlowModelDeployerConfig]
property
readonly
Returns MLFlowModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.mlflow.flavors.mlflow_model_deployer_flavor.MLFlowModelDeployerConfig] |
The config class. |
implementation_class: Type[MLFlowModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[MLFlowModelDeployer] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
mlflow_utils
Implementation of utils specific to the MLflow integration.
get_missing_mlflow_experiment_tracker_error()
Returns description of how to add an MLflow experiment tracker to your stack.
Returns:
Type | Description |
---|---|
ValueError |
If no MLflow experiment tracker is registered in the active stack. |
Source code in zenml/integrations/mlflow/mlflow_utils.py
def get_missing_mlflow_experiment_tracker_error() -> ValueError:
"""Returns description of how to add an MLflow experiment tracker to your stack.
Returns:
ValueError: If no MLflow experiment tracker is registered in the active stack.
"""
return ValueError(
"The active stack needs to have a MLflow experiment tracker "
"component registered to be able to track experiments using "
"MLflow. You can create a new stack with a MLflow experiment "
"tracker component or update your existing stack to add this "
"component, e.g.:\n\n"
" 'zenml experiment-tracker register mlflow_tracker "
"--type=mlflow'\n"
" 'zenml stack register stack-name -e mlflow_tracker ...'\n"
)
get_tracking_uri()
Gets the MLflow tracking URI from the active experiment tracking stack component.
noqa: DAR401
Returns:
Type | Description |
---|---|
str |
MLflow tracking URI. |
Source code in zenml/integrations/mlflow/mlflow_utils.py
def get_tracking_uri() -> str:
"""Gets the MLflow tracking URI from the active experiment tracking stack component.
# noqa: DAR401
Returns:
MLflow tracking URI.
"""
from zenml.integrations.mlflow.experiment_trackers.mlflow_experiment_tracker import (
MLFlowExperimentTracker,
)
tracker = Client().active_stack.experiment_tracker
if tracker is None or not isinstance(tracker, MLFlowExperimentTracker):
raise get_missing_mlflow_experiment_tracker_error()
return tracker.get_tracking_uri()
is_zenml_run(run)
Checks if a MLflow run is a ZenML run or not.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run |
Run |
The run to check. |
required |
Returns:
Type | Description |
---|---|
bool |
If the run is a ZenML run. |
Source code in zenml/integrations/mlflow/mlflow_utils.py
def is_zenml_run(run: Run) -> bool:
"""Checks if a MLflow run is a ZenML run or not.
Args:
run: The run to check.
Returns:
If the run is a ZenML run.
"""
return ZENML_TAG_KEY in run.data.tags
stop_zenml_mlflow_runs()
Stops active ZenML Mlflow runs.
This function stops all MLflow active runs until no active run exists or a non-ZenML run is active.
Source code in zenml/integrations/mlflow/mlflow_utils.py
def stop_zenml_mlflow_runs() -> None:
"""Stops active ZenML Mlflow runs.
This function stops all MLflow active runs until no active run exists or
a non-ZenML run is active.
"""
active_run = mlflow.active_run()
while active_run:
if is_zenml_run(active_run):
logger.debug("Stopping mlflow run %s.", active_run.info.run_id)
mlflow.end_run()
active_run = mlflow.active_run()
else:
break
model_deployers
special
Initialization of the MLflow model deployers.
mlflow_model_deployer
Implementation of the MLflow model deployer.
MLFlowModelDeployer (BaseModelDeployer)
MLflow implementation of the BaseModelDeployer.
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
class MLFlowModelDeployer(BaseModelDeployer):
"""MLflow implementation of the BaseModelDeployer."""
NAME: ClassVar[str] = "MLflow"
FLAVOR: ClassVar[Type[BaseModelDeployerFlavor]] = MLFlowModelDeployerFlavor
_service_path: Optional[str] = None
@property
def config(self) -> MLFlowModelDeployerConfig:
"""Returns the `MLFlowModelDeployerConfig` config.
Returns:
The configuration.
"""
return cast(MLFlowModelDeployerConfig, self._config)
@staticmethod
def get_service_path(id_: UUID) -> str:
"""Get the path where local MLflow service information is stored.
This includes the deployment service configuration, PID and log files
are stored.
Args:
id_: The ID of the MLflow model deployer.
Returns:
The service path.
"""
service_path = os.path.join(
GlobalConfiguration().local_stores_path,
str(id_),
)
create_dir_recursive_if_not_exists(service_path)
return service_path
@property
def local_path(self) -> str:
"""Returns the path to the root directory.
This is where all configurations for MLflow deployment daemon processes
are stored.
If the service path is not set in the config by the user, the path is
set to a local default path according to the component ID.
Returns:
The path to the local service root directory.
"""
if self._service_path is not None:
return self._service_path
if self.config.service_path:
self._service_path = self.config.service_path
else:
self._service_path = self.get_service_path(self.id)
create_dir_recursive_if_not_exists(self._service_path)
return self._service_path
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "MLFlowDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information relevant to the user.
Args:
service_instance: Instance of a SeldonDeploymentService
Returns:
A dictionary containing the information.
"""
return {
"PREDICTION_URL": service_instance.endpoint.prediction_url,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"SERVICE_PATH": service_instance.status.runtime_path,
"DAEMON_PID": str(service_instance.status.pid),
}
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new MLflow deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new MLflow
deployment server to reflect the model and other configuration
parameters specified in the supplied MLflow service `config`.
* if `replace` is True, this method will first attempt to find an
existing MLflow deployment service that is *equivalent* to the
supplied configuration parameters. Two or more MLflow deployment
services are considered equivalent if they have the same
`pipeline_name`, `pipeline_step_name` and `model_name` configuration
parameters. To put it differently, two MLflow deployment services
are equivalent if they serve versions of the same model deployed by
the same pipeline step. If an equivalent MLflow deployment is found,
it will be updated in place to reflect the new configuration
parameters.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new MLflow deployment
server for each new model version. If multiple equivalent MLflow
deployment servers are found, one is selected at random to be updated
and the others are deleted.
Args:
config: the configuration of the model to be deployed with MLflow.
replace: set this flag to True to find and update an equivalent
MLflow deployment server with the new model instead of
creating and starting a new deployment server.
timeout: the timeout in seconds to wait for the MLflow server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the MLflow
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML MLflow deployment service object that can be used to
interact with the MLflow model server.
"""
config = cast(MLFlowDeploymentConfig, config)
service = None
# if replace is True, remove all existing services
if replace is True:
existing_services = self.find_model_server(
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for existing_service in existing_services:
if service is None:
# keep the most recently created service
service = cast(MLFlowDeploymentService, existing_service)
try:
# delete the older services and don't wait for them to
# be deprovisioned
self._clean_up_existing_service(
existing_service=cast(
MLFlowDeploymentService, existing_service
),
timeout=timeout,
force=True,
)
except RuntimeError:
# ignore errors encountered while stopping old services
pass
if service:
logger.info(
f"Updating an existing MLflow deployment service: {service}"
)
# set the root runtime path with the stack component's UUID
config.root_runtime_path = self.local_path
service.stop(timeout=timeout, force=True)
service.update(config)
service.start(timeout=timeout)
else:
# create a new MLFlowDeploymentService instance
service = self._create_new_service(timeout, config)
logger.info(f"Created a new MLflow deployment service: {service}")
return cast(BaseService, service)
def _clean_up_existing_service(
self,
timeout: int,
force: bool,
existing_service: MLFlowDeploymentService,
) -> None:
# stop the older service
existing_service.stop(timeout=timeout, force=force)
# delete the old configuration file
if existing_service.status.runtime_path:
shutil.rmtree(existing_service.status.runtime_path)
# the step will receive a config from the user that mentions the number
# of workers etc.the step implementation will create a new config using
# all values from the user and add values like pipeline name, model_uri
def _create_new_service(
self, timeout: int, config: MLFlowDeploymentConfig
) -> MLFlowDeploymentService:
"""Creates a new MLFlowDeploymentService.
Args:
timeout: the timeout in seconds to wait for the MLflow server
to be provisioned and successfully started or updated.
config: the configuration of the model to be deployed with MLflow.
Returns:
The MLFlowDeploymentService object that can be used to interact
with the MLflow model server.
"""
# set the root runtime path with the stack component's UUID
config.root_runtime_path = self.local_path
# create a new service for the new model
service = MLFlowDeploymentService(config)
service.start(timeout=timeout)
return service
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
model_type: Optional[str] = None,
) -> List[BaseService]:
"""Finds one or more model servers that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: Name of the pipeline that the deployed model was part
of.
pipeline_run_id: ID of the pipeline run which the deployed model
was part of.
pipeline_step_name: The name of the pipeline model deployment step
that deployed the model.
model_name: Name of the deployed model.
model_uri: URI of the deployed model.
model_type: Type/format of the deployed model. Not used in this
MLflow case.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
Raises:
TypeError: if any of the input arguments are of an invalid type.
"""
services = []
config = MLFlowDeploymentConfig(
model_name=model_name or "",
model_uri=model_uri or "",
pipeline_name=pipeline_name or "",
pipeline_run_id=pipeline_run_id or "",
pipeline_step_name=pipeline_step_name or "",
)
# find all services that match the input criteria
for root, _, files in os.walk(self.local_path):
if service_uuid and Path(root).name != str(service_uuid):
continue
for file in files:
if file == SERVICE_DAEMON_CONFIG_FILE_NAME:
service_config_path = os.path.join(root, file)
logger.debug(
"Loading service daemon configuration from %s",
service_config_path,
)
existing_service_config = None
with open(service_config_path, "r") as f:
existing_service_config = f.read()
existing_service = ServiceRegistry().load_service_from_json(
existing_service_config
)
if not isinstance(
existing_service, MLFlowDeploymentService
):
raise TypeError(
f"Expected service type MLFlowDeploymentService but got "
f"{type(existing_service)} instead"
)
existing_service.update_status()
if self._matches_search_criteria(existing_service, config):
if not running or existing_service.is_running:
services.append(cast(BaseService, existing_service))
return services
def _matches_search_criteria(
self,
existing_service: MLFlowDeploymentService,
config: MLFlowDeploymentConfig,
) -> bool:
"""Returns true if a service matches the input criteria.
If any of the values in the input criteria are None, they are ignored.
This allows listing services just by common pipeline names or step
names, etc.
Args:
existing_service: The materialized Service instance derived from
the config of the older (existing) service
config: The MLFlowDeploymentConfig object passed to the
deploy_model function holding parameters of the new service
to be created.
Returns:
True if the service matches the input criteria.
"""
existing_service_config = existing_service.config
# check if the existing service matches the input criteria
if (
(
not config.pipeline_name
or existing_service_config.pipeline_name == config.pipeline_name
)
and (
not config.model_name
or existing_service_config.model_name == config.model_name
)
and (
not config.pipeline_step_name
or existing_service_config.pipeline_step_name
== config.pipeline_step_name
)
and (
not config.pipeline_run_id
or existing_service_config.pipeline_run_id
== config.pipeline_run_id
)
):
return True
return False
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to stop a model server.
Args:
uuid: UUID of the model server to stop.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, stop it
if existing_services:
existing_services[0].stop(timeout=timeout, force=force)
def start_model_server(
self, uuid: UUID, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
) -> None:
"""Method to start a model server.
Args:
uuid: UUID of the model server to start.
timeout: Timeout in seconds to wait for the service to start.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, start it
if existing_services:
existing_services[0].start(timeout=timeout)
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to delete all configuration of a model server.
Args:
uuid: UUID of the model server to delete.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, clean it up
if existing_services:
service = cast(MLFlowDeploymentService, existing_services[0])
self._clean_up_existing_service(
existing_service=service, timeout=timeout, force=force
)
config: MLFlowModelDeployerConfig
property
readonly
Returns the MLFlowModelDeployerConfig
config.
Returns:
Type | Description |
---|---|
MLFlowModelDeployerConfig |
The configuration. |
local_path: str
property
readonly
Returns the path to the root directory.
This is where all configurations for MLflow deployment daemon processes are stored.
If the service path is not set in the config by the user, the path is set to a local default path according to the component ID.
Returns:
Type | Description |
---|---|
str |
The path to the local service root directory. |
FLAVOR (BaseModelDeployerFlavor)
Model deployer flavor for MLFlow models.
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
class MLFlowModelDeployerFlavor(BaseModelDeployerFlavor):
"""Model deployer flavor for MLFlow models."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return MLFLOW_MODEL_DEPLOYER_FLAVOR
@property
def config_class(self) -> Type[MLFlowModelDeployerConfig]:
"""Returns `MLFlowModelDeployerConfig` config class.
Returns:
The config class.
"""
return MLFlowModelDeployerConfig
@property
def implementation_class(self) -> Type["MLFlowModelDeployer"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.mlflow.model_deployers import (
MLFlowModelDeployer,
)
return MLFlowModelDeployer
config_class: Type[zenml.integrations.mlflow.flavors.mlflow_model_deployer_flavor.MLFlowModelDeployerConfig]
property
readonly
Returns MLFlowModelDeployerConfig
config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.mlflow.flavors.mlflow_model_deployer_flavor.MLFlowModelDeployerConfig] |
The config class. |
implementation_class: Type[MLFlowModelDeployer]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[MLFlowModelDeployer] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |
delete_model_server(self, uuid, timeout=10, force=False)
Method to delete all configuration of a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to delete. |
required |
timeout |
int |
Timeout in seconds to wait for the service to stop. |
10 |
force |
bool |
If True, force the service to stop. |
False |
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def delete_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to delete all configuration of a model server.
Args:
uuid: UUID of the model server to delete.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, clean it up
if existing_services:
service = cast(MLFlowDeploymentService, existing_services[0])
self._clean_up_existing_service(
existing_service=service, timeout=timeout, force=force
)
deploy_model(self, config, replace=False, timeout=10)
Create a new MLflow deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the replace
argument value:
-
if
replace
is False, calling this method will create a new MLflow deployment server to reflect the model and other configuration parameters specified in the supplied MLflow serviceconfig
. -
if
replace
is True, this method will first attempt to find an existing MLflow deployment service that is equivalent to the supplied configuration parameters. Two or more MLflow deployment services are considered equivalent if they have the samepipeline_name
,pipeline_step_name
andmodel_name
configuration parameters. To put it differently, two MLflow deployment services are equivalent if they serve versions of the same model deployed by the same pipeline step. If an equivalent MLflow deployment is found, it will be updated in place to reflect the new configuration parameters.
Callers should set replace
to True if they want a continuous model
deployment workflow that doesn't spin up a new MLflow deployment
server for each new model version. If multiple equivalent MLflow
deployment servers are found, one is selected at random to be updated
and the others are deleted.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServiceConfig |
the configuration of the model to be deployed with MLflow. |
required |
replace |
bool |
set this flag to True to find and update an equivalent MLflow deployment server with the new model instead of creating and starting a new deployment server. |
False |
timeout |
int |
the timeout in seconds to wait for the MLflow server to be provisioned and successfully started or updated. If set to 0, the method will return immediately after the MLflow server is provisioned, without waiting for it to fully start. |
10 |
Returns:
Type | Description |
---|---|
BaseService |
The ZenML MLflow deployment service object that can be used to interact with the MLflow model server. |
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def deploy_model(
self,
config: ServiceConfig,
replace: bool = False,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
) -> BaseService:
"""Create a new MLflow deployment service or update an existing one.
This should serve the supplied model and deployment configuration.
This method has two modes of operation, depending on the `replace`
argument value:
* if `replace` is False, calling this method will create a new MLflow
deployment server to reflect the model and other configuration
parameters specified in the supplied MLflow service `config`.
* if `replace` is True, this method will first attempt to find an
existing MLflow deployment service that is *equivalent* to the
supplied configuration parameters. Two or more MLflow deployment
services are considered equivalent if they have the same
`pipeline_name`, `pipeline_step_name` and `model_name` configuration
parameters. To put it differently, two MLflow deployment services
are equivalent if they serve versions of the same model deployed by
the same pipeline step. If an equivalent MLflow deployment is found,
it will be updated in place to reflect the new configuration
parameters.
Callers should set `replace` to True if they want a continuous model
deployment workflow that doesn't spin up a new MLflow deployment
server for each new model version. If multiple equivalent MLflow
deployment servers are found, one is selected at random to be updated
and the others are deleted.
Args:
config: the configuration of the model to be deployed with MLflow.
replace: set this flag to True to find and update an equivalent
MLflow deployment server with the new model instead of
creating and starting a new deployment server.
timeout: the timeout in seconds to wait for the MLflow server
to be provisioned and successfully started or updated. If set
to 0, the method will return immediately after the MLflow
server is provisioned, without waiting for it to fully start.
Returns:
The ZenML MLflow deployment service object that can be used to
interact with the MLflow model server.
"""
config = cast(MLFlowDeploymentConfig, config)
service = None
# if replace is True, remove all existing services
if replace is True:
existing_services = self.find_model_server(
pipeline_name=config.pipeline_name,
pipeline_step_name=config.pipeline_step_name,
model_name=config.model_name,
)
for existing_service in existing_services:
if service is None:
# keep the most recently created service
service = cast(MLFlowDeploymentService, existing_service)
try:
# delete the older services and don't wait for them to
# be deprovisioned
self._clean_up_existing_service(
existing_service=cast(
MLFlowDeploymentService, existing_service
),
timeout=timeout,
force=True,
)
except RuntimeError:
# ignore errors encountered while stopping old services
pass
if service:
logger.info(
f"Updating an existing MLflow deployment service: {service}"
)
# set the root runtime path with the stack component's UUID
config.root_runtime_path = self.local_path
service.stop(timeout=timeout, force=True)
service.update(config)
service.start(timeout=timeout)
else:
# create a new MLFlowDeploymentService instance
service = self._create_new_service(timeout, config)
logger.info(f"Created a new MLflow deployment service: {service}")
return cast(BaseService, service)
find_model_server(self, running=False, service_uuid=None, pipeline_name=None, pipeline_run_id=None, pipeline_step_name=None, model_name=None, model_uri=None, model_type=None)
Finds one or more model servers that match the given criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
running |
bool |
If true, only running services will be returned. |
False |
service_uuid |
Optional[uuid.UUID] |
The UUID of the service that was originally used to deploy the model. |
None |
pipeline_name |
Optional[str] |
Name of the pipeline that the deployed model was part of. |
None |
pipeline_run_id |
Optional[str] |
ID of the pipeline run which the deployed model was part of. |
None |
pipeline_step_name |
Optional[str] |
The name of the pipeline model deployment step that deployed the model. |
None |
model_name |
Optional[str] |
Name of the deployed model. |
None |
model_uri |
Optional[str] |
URI of the deployed model. |
None |
model_type |
Optional[str] |
Type/format of the deployed model. Not used in this MLflow case. |
None |
Returns:
Type | Description |
---|---|
List[zenml.services.service.BaseService] |
One or more Service objects representing model servers that match the input search criteria. |
Exceptions:
Type | Description |
---|---|
TypeError |
if any of the input arguments are of an invalid type. |
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def find_model_server(
self,
running: bool = False,
service_uuid: Optional[UUID] = None,
pipeline_name: Optional[str] = None,
pipeline_run_id: Optional[str] = None,
pipeline_step_name: Optional[str] = None,
model_name: Optional[str] = None,
model_uri: Optional[str] = None,
model_type: Optional[str] = None,
) -> List[BaseService]:
"""Finds one or more model servers that match the given criteria.
Args:
running: If true, only running services will be returned.
service_uuid: The UUID of the service that was originally used
to deploy the model.
pipeline_name: Name of the pipeline that the deployed model was part
of.
pipeline_run_id: ID of the pipeline run which the deployed model
was part of.
pipeline_step_name: The name of the pipeline model deployment step
that deployed the model.
model_name: Name of the deployed model.
model_uri: URI of the deployed model.
model_type: Type/format of the deployed model. Not used in this
MLflow case.
Returns:
One or more Service objects representing model servers that match
the input search criteria.
Raises:
TypeError: if any of the input arguments are of an invalid type.
"""
services = []
config = MLFlowDeploymentConfig(
model_name=model_name or "",
model_uri=model_uri or "",
pipeline_name=pipeline_name or "",
pipeline_run_id=pipeline_run_id or "",
pipeline_step_name=pipeline_step_name or "",
)
# find all services that match the input criteria
for root, _, files in os.walk(self.local_path):
if service_uuid and Path(root).name != str(service_uuid):
continue
for file in files:
if file == SERVICE_DAEMON_CONFIG_FILE_NAME:
service_config_path = os.path.join(root, file)
logger.debug(
"Loading service daemon configuration from %s",
service_config_path,
)
existing_service_config = None
with open(service_config_path, "r") as f:
existing_service_config = f.read()
existing_service = ServiceRegistry().load_service_from_json(
existing_service_config
)
if not isinstance(
existing_service, MLFlowDeploymentService
):
raise TypeError(
f"Expected service type MLFlowDeploymentService but got "
f"{type(existing_service)} instead"
)
existing_service.update_status()
if self._matches_search_criteria(existing_service, config):
if not running or existing_service.is_running:
services.append(cast(BaseService, existing_service))
return services
get_model_server_info(service_instance)
staticmethod
Return implementation specific information relevant to the user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service_instance |
MLFlowDeploymentService |
Instance of a SeldonDeploymentService |
required |
Returns:
Type | Description |
---|---|
Dict[str, Optional[str]] |
A dictionary containing the information. |
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
@staticmethod
def get_model_server_info( # type: ignore[override]
service_instance: "MLFlowDeploymentService",
) -> Dict[str, Optional[str]]:
"""Return implementation specific information relevant to the user.
Args:
service_instance: Instance of a SeldonDeploymentService
Returns:
A dictionary containing the information.
"""
return {
"PREDICTION_URL": service_instance.endpoint.prediction_url,
"MODEL_URI": service_instance.config.model_uri,
"MODEL_NAME": service_instance.config.model_name,
"SERVICE_PATH": service_instance.status.runtime_path,
"DAEMON_PID": str(service_instance.status.pid),
}
get_service_path(id_)
staticmethod
Get the path where local MLflow service information is stored.
This includes the deployment service configuration, PID and log files are stored.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
id_ |
UUID |
The ID of the MLflow model deployer. |
required |
Returns:
Type | Description |
---|---|
str |
The service path. |
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
@staticmethod
def get_service_path(id_: UUID) -> str:
"""Get the path where local MLflow service information is stored.
This includes the deployment service configuration, PID and log files
are stored.
Args:
id_: The ID of the MLflow model deployer.
Returns:
The service path.
"""
service_path = os.path.join(
GlobalConfiguration().local_stores_path,
str(id_),
)
create_dir_recursive_if_not_exists(service_path)
return service_path
start_model_server(self, uuid, timeout=10)
Method to start a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to start. |
required |
timeout |
int |
Timeout in seconds to wait for the service to start. |
10 |
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def start_model_server(
self, uuid: UUID, timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
) -> None:
"""Method to start a model server.
Args:
uuid: UUID of the model server to start.
timeout: Timeout in seconds to wait for the service to start.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, start it
if existing_services:
existing_services[0].start(timeout=timeout)
stop_model_server(self, uuid, timeout=10, force=False)
Method to stop a model server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
uuid |
UUID |
UUID of the model server to stop. |
required |
timeout |
int |
Timeout in seconds to wait for the service to stop. |
10 |
force |
bool |
If True, force the service to stop. |
False |
Source code in zenml/integrations/mlflow/model_deployers/mlflow_model_deployer.py
def stop_model_server(
self,
uuid: UUID,
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT,
force: bool = False,
) -> None:
"""Method to stop a model server.
Args:
uuid: UUID of the model server to stop.
timeout: Timeout in seconds to wait for the service to stop.
force: If True, force the service to stop.
"""
# get list of all services
existing_services = self.find_model_server(service_uuid=uuid)
# if the service exists, stop it
if existing_services:
existing_services[0].stop(timeout=timeout, force=force)
services
special
Initialization of the MLflow Service.
mlflow_deployment
Implementation of the MLflow deployment functionality.
MLFlowDeploymentConfig (LocalDaemonServiceConfig)
pydantic-model
MLflow model deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
model_uri |
str |
URI of the MLflow model to serve |
model_name |
str |
the name of the model |
workers |
int |
number of workers to use for the prediction service |
mlserver |
bool |
set to True to use the MLflow MLServer backend (see https://github.com/SeldonIO/MLServer). If False, the MLflow built-in scoring server will be used. |
Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
class MLFlowDeploymentConfig(LocalDaemonServiceConfig):
"""MLflow model deployment configuration.
Attributes:
model_uri: URI of the MLflow model to serve
model_name: the name of the model
workers: number of workers to use for the prediction service
mlserver: set to True to use the MLflow MLServer backend (see
https://github.com/SeldonIO/MLServer). If False, the
MLflow built-in scoring server will be used.
"""
model_uri: str
model_name: str
workers: int = 1
mlserver: bool = False
MLFlowDeploymentEndpoint (LocalDaemonServiceEndpoint)
pydantic-model
A service endpoint exposed by the MLflow deployment daemon.
Attributes:
Name | Type | Description |
---|---|---|
config |
MLFlowDeploymentEndpointConfig |
service endpoint configuration |
monitor |
HTTPEndpointHealthMonitor |
optional service endpoint health monitor |
Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
class MLFlowDeploymentEndpoint(LocalDaemonServiceEndpoint):
"""A service endpoint exposed by the MLflow deployment daemon.
Attributes:
config: service endpoint configuration
monitor: optional service endpoint health monitor
"""
config: MLFlowDeploymentEndpointConfig
monitor: HTTPEndpointHealthMonitor
@property
def prediction_url(self) -> Optional[str]:
"""Gets the prediction URL for the endpoint.
Returns:
the prediction URL for the endpoint
"""
uri = self.status.uri
if not uri:
return None
return os.path.join(uri, self.config.prediction_url_path)
prediction_url: Optional[str]
property
readonly
Gets the prediction URL for the endpoint.
Returns:
Type | Description |
---|---|
Optional[str] |
the prediction URL for the endpoint |
MLFlowDeploymentEndpointConfig (LocalDaemonServiceEndpointConfig)
pydantic-model
MLflow daemon service endpoint configuration.
Attributes:
Name | Type | Description |
---|---|---|
prediction_url_path |
str |
URI subpath for prediction requests |
Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
class MLFlowDeploymentEndpointConfig(LocalDaemonServiceEndpointConfig):
"""MLflow daemon service endpoint configuration.
Attributes:
prediction_url_path: URI subpath for prediction requests
"""
prediction_url_path: str
MLFlowDeploymentService (LocalDaemonService)
pydantic-model
MLflow deployment service used to start a local prediction server for MLflow models.
Attributes:
Name | Type | Description |
---|---|---|
SERVICE_TYPE |
ClassVar[zenml.services.service_type.ServiceType] |
a service type descriptor with information describing the MLflow deployment service class |
config |
MLFlowDeploymentConfig |
service configuration |
endpoint |
MLFlowDeploymentEndpoint |
optional service endpoint |
Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
class MLFlowDeploymentService(LocalDaemonService):
"""MLflow deployment service used to start a local prediction server for MLflow models.
Attributes:
SERVICE_TYPE: a service type descriptor with information describing
the MLflow deployment service class
config: service configuration
endpoint: optional service endpoint
"""
SERVICE_TYPE = ServiceType(
name="mlflow-deployment",
type="model-serving",
flavor="mlflow",
description="MLflow prediction service",
)
config: MLFlowDeploymentConfig
endpoint: MLFlowDeploymentEndpoint
def __init__(
self,
config: Union[MLFlowDeploymentConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the MLflow deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-700]: implement a service factory or builder for MLflow
# deployment services
if (
isinstance(config, MLFlowDeploymentConfig)
and "endpoint" not in attrs
):
if config.mlserver:
prediction_url_path = MLSERVER_PREDICTION_URL_PATH
healthcheck_uri_path = MLSERVER_HEALTHCHECK_URL_PATH
use_head_request = False
else:
prediction_url_path = MLFLOW_PREDICTION_URL_PATH
healthcheck_uri_path = MLFLOW_HEALTHCHECK_URL_PATH
use_head_request = True
endpoint = MLFlowDeploymentEndpoint(
config=MLFlowDeploymentEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
prediction_url_path=prediction_url_path,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=healthcheck_uri_path,
use_head_request=use_head_request,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
def run(self) -> None:
"""Start the service."""
logger.info(
"Starting MLflow prediction service as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
try:
serve_kwargs: Dict[str, Any] = {}
# MLflow version 1.26 introduces an additional mandatory
# `timeout` argument to the `PyFuncBackend.serve` function
if int(MLFLOW_VERSION.split(".")[1]) >= 26:
serve_kwargs["timeout"] = None
backend = PyFuncBackend(
config={},
no_conda=True,
workers=self.config.workers,
install_mlflow=False,
)
backend.serve(
model_uri=self.config.model_uri,
port=self.endpoint.status.port,
host="localhost",
enable_mlserver=self.config.mlserver,
**serve_kwargs,
)
except KeyboardInterrupt:
logger.info(
"MLflow prediction service stopped. Resuming normal execution."
)
@property
def prediction_url(self) -> Optional[str]:
"""Get the URI where the prediction service is answering requests.
Returns:
The URI where the prediction service can be contacted to process
HTTP/REST inference requests, or None, if the service isn't running.
"""
if not self.is_running:
return None
return self.endpoint.prediction_url
def predict(self, request: "NDArray[Any]") -> "NDArray[Any]":
"""Make a prediction using the service.
Args:
request: a numpy array representing the request
Returns:
A numpy array representing the prediction returned by the service.
Raises:
Exception: if the service is not running
ValueError: if the prediction endpoint is unknown.
"""
if not self.is_running:
raise Exception(
"MLflow prediction service is not running. "
"Please start the service before making predictions."
)
if self.endpoint.prediction_url is not None:
response = requests.post(
self.endpoint.prediction_url,
json={"instances": request.tolist()},
)
else:
raise ValueError("No endpoint known for prediction.")
response.raise_for_status()
return np.array(response.json())
prediction_url: Optional[str]
property
readonly
Get the URI where the prediction service is answering requests.
Returns:
Type | Description |
---|---|
Optional[str] |
The URI where the prediction service can be contacted to process HTTP/REST inference requests, or None, if the service isn't running. |
__init__(self, config, **attrs)
special
Initialize the MLflow deployment service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
Union[zenml.integrations.mlflow.services.mlflow_deployment.MLFlowDeploymentConfig, Dict[str, Any]] |
service configuration |
required |
attrs |
Any |
additional attributes to set on the service |
{} |
Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
def __init__(
self,
config: Union[MLFlowDeploymentConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
"""Initialize the MLflow deployment service.
Args:
config: service configuration
attrs: additional attributes to set on the service
"""
# ensure that the endpoint is created before the service is initialized
# TODO [ENG-700]: implement a service factory or builder for MLflow
# deployment services
if (
isinstance(config, MLFlowDeploymentConfig)
and "endpoint" not in attrs
):
if config.mlserver:
prediction_url_path = MLSERVER_PREDICTION_URL_PATH
healthcheck_uri_path = MLSERVER_HEALTHCHECK_URL_PATH
use_head_request = False
else:
prediction_url_path = MLFLOW_PREDICTION_URL_PATH
healthcheck_uri_path = MLFLOW_HEALTHCHECK_URL_PATH
use_head_request = True
endpoint = MLFlowDeploymentEndpoint(
config=MLFlowDeploymentEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
prediction_url_path=prediction_url_path,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=healthcheck_uri_path,
use_head_request=use_head_request,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
predict(self, request)
Make a prediction using the service.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
NDArray[Any] |
a numpy array representing the request |
required |
Returns:
Type | Description |
---|---|
NDArray[Any] |
A numpy array representing the prediction returned by the service. |
Exceptions:
Type | Description |
---|---|
Exception |
if the service is not running |
ValueError |
if the prediction endpoint is unknown. |
Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
def predict(self, request: "NDArray[Any]") -> "NDArray[Any]":
"""Make a prediction using the service.
Args:
request: a numpy array representing the request
Returns:
A numpy array representing the prediction returned by the service.
Raises:
Exception: if the service is not running
ValueError: if the prediction endpoint is unknown.
"""
if not self.is_running:
raise Exception(
"MLflow prediction service is not running. "
"Please start the service before making predictions."
)
if self.endpoint.prediction_url is not None:
response = requests.post(
self.endpoint.prediction_url,
json={"instances": request.tolist()},
)
else:
raise ValueError("No endpoint known for prediction.")
response.raise_for_status()
return np.array(response.json())
run(self)
Start the service.
Source code in zenml/integrations/mlflow/services/mlflow_deployment.py
def run(self) -> None:
"""Start the service."""
logger.info(
"Starting MLflow prediction service as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
try:
serve_kwargs: Dict[str, Any] = {}
# MLflow version 1.26 introduces an additional mandatory
# `timeout` argument to the `PyFuncBackend.serve` function
if int(MLFLOW_VERSION.split(".")[1]) >= 26:
serve_kwargs["timeout"] = None
backend = PyFuncBackend(
config={},
no_conda=True,
workers=self.config.workers,
install_mlflow=False,
)
backend.serve(
model_uri=self.config.model_uri,
port=self.endpoint.status.port,
host="localhost",
enable_mlserver=self.config.mlserver,
**serve_kwargs,
)
except KeyboardInterrupt:
logger.info(
"MLflow prediction service stopped. Resuming normal execution."
)
steps
special
Initialization of the MLflow standard interface steps.
mlflow_deployer
Implementation of the MLflow model deployer pipeline step.
MLFlowDeployerParameters (BaseParameters)
pydantic-model
Model deployer step parameters for MLflow.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
str |
the name of the MLflow model logged in the MLflow artifact store for the current pipeline. |
experiment_name |
Optional[str] |
Name of the MLflow experiment in which the model was logged. |
run_name |
Optional[str] |
Name of the MLflow run in which the model was logged. |
workers |
int |
number of workers to use for the prediction service |
mlserver |
bool |
set to True to use the MLflow MLServer backend (see https://github.com/SeldonIO/MLServer). If False, the MLflow built-in scoring server will be used. |
timeout |
int |
the number of seconds to wait for the service to start/stop. |
Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
class MLFlowDeployerParameters(BaseParameters):
"""Model deployer step parameters for MLflow.
Attributes:
model_name: the name of the MLflow model logged in the MLflow artifact
store for the current pipeline.
experiment_name: Name of the MLflow experiment in which the model was
logged.
run_name: Name of the MLflow run in which the model was logged.
workers: number of workers to use for the prediction service
mlserver: set to True to use the MLflow MLServer backend (see
https://github.com/SeldonIO/MLServer). If False, the
MLflow built-in scoring server will be used.
timeout: the number of seconds to wait for the service to start/stop.
"""
model_name: str = "model"
experiment_name: Optional[str] = None
run_name: Optional[str] = None
workers: int = 1
mlserver: bool = False
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
mlflow_model_deployer_step (BaseStep)
Model deployer pipeline step for MLflow.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
whether to deploy the model or not |
required | |
model |
the model artifact to deploy |
required | |
params |
parameters for the deployer step |
required |
Returns:
Type | Description |
---|---|
MLflow deployment service |
PARAMETERS_CLASS (BaseParameters)
pydantic-model
Model deployer step parameters for MLflow.
Attributes:
Name | Type | Description |
---|---|---|
model_name |
str |
the name of the MLflow model logged in the MLflow artifact store for the current pipeline. |
experiment_name |
Optional[str] |
Name of the MLflow experiment in which the model was logged. |
run_name |
Optional[str] |
Name of the MLflow run in which the model was logged. |
workers |
int |
number of workers to use for the prediction service |
mlserver |
bool |
set to True to use the MLflow MLServer backend (see https://github.com/SeldonIO/MLServer). If False, the MLflow built-in scoring server will be used. |
timeout |
int |
the number of seconds to wait for the service to start/stop. |
Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
class MLFlowDeployerParameters(BaseParameters):
"""Model deployer step parameters for MLflow.
Attributes:
model_name: the name of the MLflow model logged in the MLflow artifact
store for the current pipeline.
experiment_name: Name of the MLflow experiment in which the model was
logged.
run_name: Name of the MLflow run in which the model was logged.
workers: number of workers to use for the prediction service
mlserver: set to True to use the MLflow MLServer backend (see
https://github.com/SeldonIO/MLServer). If False, the
MLflow built-in scoring server will be used.
timeout: the number of seconds to wait for the service to start/stop.
"""
model_name: str = "model"
experiment_name: Optional[str] = None
run_name: Optional[str] = None
workers: int = 1
mlserver: bool = False
timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT
entrypoint(deploy_decision, model, params)
staticmethod
Model deployer pipeline step for MLflow.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
deploy_decision |
bool |
whether to deploy the model or not |
required |
model |
ModelArtifact |
the model artifact to deploy |
required |
params |
MLFlowDeployerParameters |
parameters for the deployer step |
required |
Returns:
Type | Description |
---|---|
MLFlowDeploymentService |
MLflow deployment service |
Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
@step(enable_cache=False)
def mlflow_model_deployer_step(
deploy_decision: bool,
model: ModelArtifact,
params: MLFlowDeployerParameters,
) -> MLFlowDeploymentService:
"""Model deployer pipeline step for MLflow.
# noqa: DAR401
Args:
deploy_decision: whether to deploy the model or not
model: the model artifact to deploy
params: parameters for the deployer step
Returns:
MLflow deployment service
"""
model_deployer = cast(
MLFlowModelDeployer, MLFlowModelDeployer.get_active_model_deployer()
)
# fetch the MLflow artifacts logged during the pipeline run
experiment_tracker = Client().active_stack.experiment_tracker
if not isinstance(experiment_tracker, MLFlowExperimentTracker):
raise get_missing_mlflow_experiment_tracker_error()
# get pipeline name, step name and run id
step_env = cast(StepEnvironment, Environment()[STEP_ENVIRONMENT_NAME])
pipeline_name = step_env.pipeline_name
run_id = step_env.pipeline_run_id
step_name = step_env.step_name
client = MlflowClient()
mlflow_run_id = experiment_tracker.get_run_id(
experiment_name=params.experiment_name or pipeline_name,
run_name=params.run_name or run_id,
)
model_uri = ""
if mlflow_run_id and client.list_artifacts(
mlflow_run_id, params.model_name
):
model_uri = artifact_utils.get_artifact_uri(
run_id=mlflow_run_id, artifact_path=params.model_name
)
# fetch existing services with same pipeline name, step name and model name
existing_services = model_deployer.find_model_server(
pipeline_name=pipeline_name,
pipeline_step_name=step_name,
model_name=params.model_name,
)
# create a config for the new model service
predictor_cfg = MLFlowDeploymentConfig(
model_name=params.model_name or "",
model_uri=model_uri,
workers=params.workers,
mlserver=params.mlserver,
pipeline_name=pipeline_name,
pipeline_run_id=run_id,
pipeline_step_name=step_name,
)
# Creating a new service with inactive state and status by default
service = MLFlowDeploymentService(predictor_cfg)
if existing_services:
service = cast(MLFlowDeploymentService, existing_services[0])
# check for conditions to deploy the model
if not model_uri:
# an MLflow model was not trained in the current run, so we simply reuse
# the currently running service created for the same model, if any
if not existing_services:
logger.warning(
f"An MLflow model with name `{params.model_name}` was not "
f"logged in the current pipeline run and no running MLflow "
f"model server was found. Please ensure that your pipeline "
f"includes a step with a MLflow experiment configured that "
"trains a model and logs it to MLflow. This could also happen "
"if the current pipeline run did not log an MLflow model "
f"because the training step was cached."
)
# return an inactive service just because we have to return
# something
return service
logger.info(
f"An MLflow model with name `{params.model_name}` was not "
f"trained in the current pipeline run. Reusing the existing "
f"MLflow model server."
)
if not service.is_running:
service.start(params.timeout)
# return the existing service
return service
# even when the deploy decision is negative, if an existing model server
# is not running for this pipeline/step, we still have to serve the
# current model, to ensure that a model server is available at all times
if not deploy_decision and existing_services:
logger.info(
f"Skipping model deployment because the model quality does not "
f"meet the criteria. Reusing last model server deployed by step "
f"'{step_name}' and pipeline '{pipeline_name}' for model "
f"'{params.model_name}'..."
)
# even when the deploy decision is negative, we still need to start
# the previous model server if it is no longer running, to ensure
# that a model server is available at all times
if not service.is_running:
service.start(params.timeout)
return service
# create a new model deployment and replace an old one if it exists
new_service = cast(
MLFlowDeploymentService,
model_deployer.deploy_model(
replace=True,
config=predictor_cfg,
timeout=params.timeout,
),
)
logger.info(
f"MLflow deployment service started and reachable at:\n"
f" {new_service.prediction_url}\n"
)
return new_service
mlflow_deployer_step(enable_cache=True, name=None)
Creates a pipeline step to deploy a given ML model with a local MLflow prediction server.
The returned step can be used in a pipeline to implement continuous deployment for an MLflow model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
enable_cache |
bool |
Specify whether caching is enabled for this step. If no value is passed, caching is enabled by default |
True |
name |
Optional[str] |
Name of the step. |
None |
Returns:
Type | Description |
---|---|
Type[zenml.steps.base_step.BaseStep] |
an MLflow model deployer pipeline step |
Source code in zenml/integrations/mlflow/steps/mlflow_deployer.py
def mlflow_deployer_step(
enable_cache: bool = True,
name: Optional[str] = None,
) -> Type[BaseStep]:
"""Creates a pipeline step to deploy a given ML model with a local MLflow prediction server.
The returned step can be used in a pipeline to implement continuous
deployment for an MLflow model.
Args:
enable_cache: Specify whether caching is enabled for this step. If no
value is passed, caching is enabled by default
name: Name of the step.
Returns:
an MLflow model deployer pipeline step
"""
logger.warning(
"The `mlflow_deployer_step` function is deprecated. Please "
"use the built-in `mlflow_model_deployer_step` step instead."
)
return mlflow_model_deployer_step