Zen Server
zenml.zen_server
special
ZenML Server Implementation.
The ZenML Server is a centralized service meant for use in a collaborative setting in which stacks, stack components, flavors, pipeline and pipeline runs can be shared over the network with other users.
You can use the zenml server up
command to spin up ZenML server instances
that are either running locally as daemon processes or docker containers, or
to deploy a ZenML server remotely on a managed cloud platform. The other CLI
commands in the same zenml server
group can be used to manage the server
instances deployed from your local machine.
To connect the local ZenML client to one of the managed ZenML servers, call
zenml server connect
with the name of the server you want to connect to.
auth
Authentication module for ZenML server.
AuthContext (BaseModel)
pydantic-model
The authentication context.
Source code in zenml/zen_server/auth.py
class AuthContext(BaseModel):
"""The authentication context."""
user: UserModel
AuthScheme (StrEnum)
The authentication scheme.
Source code in zenml/zen_server/auth.py
class AuthScheme(StrEnum):
"""The authentication scheme."""
NO_AUTH = "NO_AUTH"
HTTP_BASIC = "HTTP_BASIC"
OAUTH2_PASSWORD_BEARER = "OAUTH2_PASSWORD_BEARER"
authenticate_credentials(user_name_or_id=None, password=None, access_token=None, activation_token=None)
Verify if user authentication credentials are valid.
This function can be used to validate all of the supplied user credentials to cover a range of possibilities:
- username+password
- access token (with embedded user id)
- username+activation token
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
The username or user ID. |
None |
password |
Optional[str] |
The password. |
None |
access_token |
Optional[str] |
The access token. |
None |
activation_token |
Optional[str] |
The activation token. |
None |
Returns:
Type | Description |
---|---|
Optional[zenml.zen_server.auth.AuthContext] |
The authenticated account details, if the account is valid, otherwise None. |
Source code in zenml/zen_server/auth.py
def authenticate_credentials(
user_name_or_id: Optional[Union[str, UUID]] = None,
password: Optional[str] = None,
access_token: Optional[str] = None,
activation_token: Optional[str] = None,
) -> Optional[AuthContext]:
"""Verify if user authentication credentials are valid.
This function can be used to validate all of the supplied
user credentials to cover a range of possibilities:
* username+password
* access token (with embedded user id)
* username+activation token
Args:
user_name_or_id: The username or user ID.
password: The password.
access_token: The access token.
activation_token: The activation token.
Returns:
The authenticated account details, if the account is valid, otherwise
None.
"""
user: Optional[UserModel] = None
auth_context: Optional[AuthContext] = None
if user_name_or_id:
try:
user = zen_store().get_user(user_name_or_id)
auth_context = AuthContext(user=user)
except KeyError:
# even when the user does not exist, we still want to execute the
# password/token verification to protect against response discrepancy
# attacks (https://cwe.mitre.org/data/definitions/204.html)
pass
if password is not None:
if not UserModel.verify_password(password, user):
return None
elif access_token is not None:
user = UserModel.verify_access_token(access_token)
if not user:
return None
auth_context = AuthContext(user=user)
elif activation_token is not None:
if not UserModel.verify_activation_token(activation_token, user):
return None
return auth_context
authentication_provider()
Returns the authentication provider.
Returns:
Type | Description |
---|---|
Callable[..., zenml.zen_server.auth.AuthContext] |
The authentication provider. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the authentication scheme is not supported. |
Source code in zenml/zen_server/auth.py
def authentication_provider() -> Callable[..., AuthContext]:
"""Returns the authentication provider.
Returns:
The authentication provider.
Raises:
ValueError: If the authentication scheme is not supported.
"""
auth_scheme = authentication_scheme()
if auth_scheme == AuthScheme.NO_AUTH:
return no_authentication
elif auth_scheme == AuthScheme.HTTP_BASIC:
return http_authentication
elif auth_scheme == AuthScheme.OAUTH2_PASSWORD_BEARER:
return oauth2_password_bearer_authentication
else:
raise ValueError(f"Unknown authentication scheme: {auth_scheme}")
authentication_scheme()
Returns the authentication type.
Returns:
Type | Description |
---|---|
AuthScheme |
The authentication type. |
Source code in zenml/zen_server/auth.py
def authentication_scheme() -> AuthScheme:
"""Returns the authentication type.
Returns:
The authentication type.
"""
auth_scheme = AuthScheme(
os.environ.get(ENV_ZENML_AUTH_TYPE, AuthScheme.OAUTH2_PASSWORD_BEARER)
)
return auth_scheme
authorize(security_scopes, token=Depends(OAuth2PasswordBearer))
Authenticates any request to the ZenML server with OAuth2 password bearer JWT tokens.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
security_scopes |
SecurityScopes |
Security scope for this token |
required |
token |
str |
The JWT bearer token to be authenticated. |
Depends(OAuth2PasswordBearer) |
Returns:
Type | Description |
---|---|
AuthContext |
The authentication context reflecting the authenticated user. |
Exceptions:
Type | Description |
---|---|
HTTPException |
If the JWT token could not be authorized. |
Source code in zenml/zen_server/auth.py
def oauth2_password_bearer_authentication(
security_scopes: SecurityScopes,
token: str = Depends(
OAuth2PasswordBearer(
tokenUrl=ROOT_URL_PATH + API + VERSION_1 + LOGIN,
scopes={
"read": "Read permissions on all entities",
"write": "Write permissions on all entities",
"me": "Editing permissions to own user",
},
)
),
) -> AuthContext:
"""Authenticates any request to the ZenML server with OAuth2 password bearer JWT tokens.
Args:
security_scopes: Security scope for this token
token: The JWT bearer token to be authenticated.
Returns:
The authentication context reflecting the authenticated user.
Raises:
HTTPException: If the JWT token could not be authorized.
"""
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
auth_context = authenticate_credentials(access_token=token)
try:
access_token = JWTToken.decode(
token_type=JWTTokenType.ACCESS_TOKEN, token=token
)
except AuthorizationException:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
for scope in security_scopes.scopes:
if scope not in access_token.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
if auth_context is None:
# We have to return an additional WWW-Authenticate header here with the
# value Bearer to be compliant with the OAuth2 spec.
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return auth_context
http_authentication(security_scopes, credentials=Depends(HTTPBasic))
Authenticates any request to the ZenML Server with basic HTTP authentication.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
security_scopes |
SecurityScopes |
Security scope will be ignored for http_auth |
required |
credentials |
HTTPBasicCredentials |
HTTP basic auth credentials passed to the request. |
Depends(HTTPBasic) |
Returns:
Type | Description |
---|---|
AuthContext |
The authentication context reflecting the authenticated user. |
Exceptions:
Type | Description |
---|---|
HTTPException |
If the user credentials could not be authenticated. |
Source code in zenml/zen_server/auth.py
def http_authentication(
security_scopes: SecurityScopes,
credentials: HTTPBasicCredentials = Depends(HTTPBasic()),
) -> AuthContext:
"""Authenticates any request to the ZenML Server with basic HTTP authentication.
Args:
security_scopes: Security scope will be ignored for http_auth
credentials: HTTP basic auth credentials passed to the request.
Returns:
The authentication context reflecting the authenticated user.
Raises:
HTTPException: If the user credentials could not be authenticated.
"""
auth_context = authenticate_credentials(
user_name_or_id=credentials.username, password=credentials.password
)
if auth_context is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
return auth_context
no_authentication(security_scopes)
Doesn't authenticate requests to the ZenML server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
security_scopes |
SecurityScopes |
Security scope will be ignored for http_auth |
required |
Returns:
Type | Description |
---|---|
AuthContext |
The authentication context reflecting the default user. |
Exceptions:
Type | Description |
---|---|
HTTPException |
If the default user is not available. |
Source code in zenml/zen_server/auth.py
def no_authentication(security_scopes: SecurityScopes) -> AuthContext:
"""Doesn't authenticate requests to the ZenML server.
Args:
security_scopes: Security scope will be ignored for http_auth
Returns:
The authentication context reflecting the default user.
Raises:
HTTPException: If the default user is not available.
"""
auth_context = authenticate_credentials(user_name_or_id=DEFAULT_USERNAME)
if auth_context is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
return auth_context
oauth2_password_bearer_authentication(security_scopes, token=Depends(OAuth2PasswordBearer))
Authenticates any request to the ZenML server with OAuth2 password bearer JWT tokens.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
security_scopes |
SecurityScopes |
Security scope for this token |
required |
token |
str |
The JWT bearer token to be authenticated. |
Depends(OAuth2PasswordBearer) |
Returns:
Type | Description |
---|---|
AuthContext |
The authentication context reflecting the authenticated user. |
Exceptions:
Type | Description |
---|---|
HTTPException |
If the JWT token could not be authorized. |
Source code in zenml/zen_server/auth.py
def oauth2_password_bearer_authentication(
security_scopes: SecurityScopes,
token: str = Depends(
OAuth2PasswordBearer(
tokenUrl=ROOT_URL_PATH + API + VERSION_1 + LOGIN,
scopes={
"read": "Read permissions on all entities",
"write": "Write permissions on all entities",
"me": "Editing permissions to own user",
},
)
),
) -> AuthContext:
"""Authenticates any request to the ZenML server with OAuth2 password bearer JWT tokens.
Args:
security_scopes: Security scope for this token
token: The JWT bearer token to be authenticated.
Returns:
The authentication context reflecting the authenticated user.
Raises:
HTTPException: If the JWT token could not be authorized.
"""
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
auth_context = authenticate_credentials(access_token=token)
try:
access_token = JWTToken.decode(
token_type=JWTTokenType.ACCESS_TOKEN, token=token
)
except AuthorizationException:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
for scope in security_scopes.scopes:
if scope not in access_token.permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
if auth_context is None:
# We have to return an additional WWW-Authenticate header here with the
# value Bearer to be compliant with the OAuth2 spec.
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return auth_context
deploy
special
ZenML server deployments.
base_provider
Base ZenML server provider class.
BaseServerProvider (ABC)
Base ZenML server provider class.
All ZenML server providers must extend and implement this base class.
Source code in zenml/zen_server/deploy/base_provider.py
class BaseServerProvider(ABC):
"""Base ZenML server provider class.
All ZenML server providers must extend and implement this base class.
"""
TYPE: ClassVar[ServerProviderType]
CONFIG_TYPE: ClassVar[Type[ServerDeploymentConfig]] = ServerDeploymentConfig
@classmethod
def register_as_provider(cls) -> None:
"""Register the class as a server provider."""
from zenml.zen_server.deploy.deployer import ServerDeployer
ServerDeployer.register_provider(cls)
@classmethod
def _convert_config(
cls, config: ServerDeploymentConfig
) -> ServerDeploymentConfig:
"""Convert a generic server deployment config into a provider specific config.
Args:
config: The generic server deployment config.
Returns:
The provider specific server deployment config.
Raises:
ServerDeploymentConfigurationError: If the configuration is not
valid.
"""
if isinstance(config, cls.CONFIG_TYPE):
return config
try:
return cls.CONFIG_TYPE(**config.dict())
except ValidationError as e:
raise ServerDeploymentConfigurationError(
f"Invalid configuration for provider {cls.TYPE.value}: {e}"
)
def deploy_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> ServerDeployment:
"""Deploy a new ZenML server.
Args:
config: The generic server deployment configuration.
timeout: The timeout in seconds to wait until the deployment is
successful. If not supplied, the default timeout value specified
by the provider is used.
Returns:
The newly created server deployment.
Raises:
ServerDeploymentExistsError: If a deployment with the same name
already exists.
"""
try:
self._get_service(config.name)
except KeyError:
pass
else:
raise ServerDeploymentExistsError(
f"ZenML server deployment with name '{config.name}' already "
f"exists"
)
# convert the generic deployment config to a provider specific
# deployment config
config = self._convert_config(config)
service = self._create_service(config, timeout)
return self._get_deployment(service)
def update_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> ServerDeployment:
"""Update an existing ZenML server deployment.
Args:
config: The new generic server deployment configuration.
timeout: The timeout in seconds to wait until the update is
successful. If not supplied, the default timeout value specified
by the provider is used.
Returns:
The updated server deployment.
Raises:
ServerDeploymentNotFoundError: If a deployment with the given name
doesn't exist.
"""
try:
service = self._get_service(config.name)
except KeyError:
raise ServerDeploymentNotFoundError(
f"ZenML server deployment with name '{config.name}' was not "
f"found"
)
# convert the generic deployment config to a provider specific
# deployment config
config = self._convert_config(config)
old_config = self._get_deployment_config(service)
if old_config == config:
logger.info(
f"The {config.name} ZenML server is already configured with "
f"the same parameters."
)
service = self._start_service(service, timeout)
else:
logger.info(f"Updating the {config.name} ZenML server.")
service = self._update_service(service, config, timeout)
return self._get_deployment(service)
def remove_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> None:
"""Tears down and removes all resources and files associated with a ZenML server deployment.
Args:
config: The generic server deployment configuration.
timeout: The timeout in seconds to wait until the server is
removed. If not supplied, the default timeout value specified
by the provider is used.
Raises:
ServerDeploymentNotFoundError: If a deployment with the given name
doesn't exist.
"""
try:
service = self._get_service(config.name)
except KeyError:
raise ServerDeploymentNotFoundError(
f"ZenML server deployment with name '{config.name}' was not "
f"found"
)
logger.info(f"Removing the {config.name} ZenML server.")
self._delete_service(service, timeout)
def get_server(
self,
config: ServerDeploymentConfig,
) -> ServerDeployment:
"""Retrieve information about a ZenML server deployment.
Args:
config: The generic server deployment configuration.
Returns:
The server deployment.
Raises:
ServerDeploymentNotFoundError: If a deployment with the given name
doesn't exist.
"""
try:
service = self._get_service(config.name)
except KeyError:
raise ServerDeploymentNotFoundError(
f"ZenML server deployment with name '{config.name}' was not "
f"found"
)
return self._get_deployment(service)
def list_servers(self) -> List[ServerDeployment]:
"""List all server deployments managed by this provider.
Returns:
The list of server deployments.
"""
return [
self._get_deployment(service) for service in self._list_services()
]
def get_server_logs(
self,
config: ServerDeploymentConfig,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Retrieve the logs of a ZenML server.
Args:
config: The generic server deployment configuration.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
Raises:
ServerDeploymentNotFoundError: If a deployment with the given name
doesn't exist.
"""
try:
service = self._get_service(config.name)
except KeyError:
raise ServerDeploymentNotFoundError(
f"ZenML server deployment with name '{config.name}' was not "
f"found"
)
return service.get_logs(follow=follow, tail=tail)
def _get_deployment_status(
self, service: BaseService
) -> ServerDeploymentStatus:
"""Get the status of a server deployment from its service.
Args:
service: The server deployment service.
Returns:
The status of the server deployment.
"""
gc = GlobalConfiguration()
url: Optional[str] = None
if service.is_running:
# all services must have an endpoint
assert service.endpoint is not None
url = service.endpoint.status.uri
connected = (
url is not None and gc.store is not None and gc.store.url == url
)
return ServerDeploymentStatus(
url=url,
status=service.status.state,
status_message=service.status.last_error,
connected=connected,
)
def _get_deployment(self, service: BaseService) -> ServerDeployment:
"""Get the server deployment associated with a service.
Args:
service: The service.
Returns:
The server deployment.
"""
config = self._get_deployment_config(service)
return ServerDeployment(
config=config,
status=self._get_deployment_status(service),
)
@classmethod
@abstractmethod
def _get_service_configuration(
cls,
server_config: ServerDeploymentConfig,
) -> Tuple[
ServiceConfig,
ServiceEndpointConfig,
ServiceEndpointHealthMonitorConfig,
]:
"""Construct the service configuration from a server deployment configuration.
Args:
server_config: server deployment configuration.
Returns:
The service, service endpoint and endpoint monitor configuration.
"""
@abstractmethod
def _create_service(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> BaseService:
"""Create, start and return a service instance for a ZenML server deployment.
Args:
config: The server deployment configuration.
timeout: The timeout in seconds to wait until the service is
running. If not supplied, a default timeout value specified
by the provider implementation should be used.
Returns:
The service instance.
"""
@abstractmethod
def _update_service(
self,
service: BaseService,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> BaseService:
"""Update an existing service instance for a ZenML server deployment.
Args:
service: The service instance.
config: The new server deployment configuration.
timeout: The timeout in seconds to wait until the updated service is
running. If not supplied, a default timeout value specified
by the provider implementation should be used.
Returns:
The updated service instance.
"""
@abstractmethod
def _start_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> BaseService:
"""Start a service instance for a ZenML server deployment.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
running. If not supplied, a default timeout value specified
by the provider implementation should be used.
Returns:
The updated service instance.
"""
@abstractmethod
def _stop_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> BaseService:
"""Stop a service instance for a ZenML server deployment.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
stopped. If not supplied, a default timeout value specified
by the provider implementation should be used.
Returns:
The updated service instance.
"""
@abstractmethod
def _delete_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> None:
"""Remove a service instance for a ZenML server deployment.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
removed. If not supplied, a default timeout value specified
by the provider implementation should be used.
"""
@abstractmethod
def _get_service(self, server_name: str) -> BaseService:
"""Get the service instance associated with a ZenML server deployment.
Args:
server_name: The server deployment name.
Returns:
The service instance.
Raises:
KeyError: If the server deployment is not found.
"""
@abstractmethod
def _list_services(self) -> List[BaseService]:
"""Get all service instances for all deployed ZenML servers.
Returns:
A list of service instances.
"""
@abstractmethod
def _get_deployment_config(
self, service: BaseService
) -> ServerDeploymentConfig:
"""Recreate the server deployment config from a service instance.
Args:
service: The service instance.
Returns:
The server deployment config.
"""
CONFIG_TYPE (BaseModel)
pydantic-model
Generic server deployment configuration.
All server deployment configurations should inherit from this class and handle extra attributes as provider specific attributes.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
Name of the server deployment. |
provider |
ServerProviderType |
The server provider type. |
Source code in zenml/zen_server/deploy/base_provider.py
class ServerDeploymentConfig(BaseModel):
"""Generic server deployment configuration.
All server deployment configurations should inherit from this class and
handle extra attributes as provider specific attributes.
Attributes:
name: Name of the server deployment.
provider: The server provider type.
"""
name: str
provider: ServerProviderType
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Allow extra attributes to be set in the base class. The concrete
# classes are responsible for validating the attributes.
extra = "allow"
Config
Pydantic configuration class.
Source code in zenml/zen_server/deploy/base_provider.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Allow extra attributes to be set in the base class. The concrete
# classes are responsible for validating the attributes.
extra = "allow"
deploy_server(self, config, timeout=None)
Deploy a new ZenML server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServerDeploymentConfig |
The generic server deployment configuration. |
required |
timeout |
Optional[int] |
The timeout in seconds to wait until the deployment is successful. If not supplied, the default timeout value specified by the provider is used. |
None |
Returns:
Type | Description |
---|---|
ServerDeployment |
The newly created server deployment. |
Exceptions:
Type | Description |
---|---|
ServerDeploymentExistsError |
If a deployment with the same name already exists. |
Source code in zenml/zen_server/deploy/base_provider.py
def deploy_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> ServerDeployment:
"""Deploy a new ZenML server.
Args:
config: The generic server deployment configuration.
timeout: The timeout in seconds to wait until the deployment is
successful. If not supplied, the default timeout value specified
by the provider is used.
Returns:
The newly created server deployment.
Raises:
ServerDeploymentExistsError: If a deployment with the same name
already exists.
"""
try:
self._get_service(config.name)
except KeyError:
pass
else:
raise ServerDeploymentExistsError(
f"ZenML server deployment with name '{config.name}' already "
f"exists"
)
# convert the generic deployment config to a provider specific
# deployment config
config = self._convert_config(config)
service = self._create_service(config, timeout)
return self._get_deployment(service)
get_server(self, config)
Retrieve information about a ZenML server deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServerDeploymentConfig |
The generic server deployment configuration. |
required |
Returns:
Type | Description |
---|---|
ServerDeployment |
The server deployment. |
Exceptions:
Type | Description |
---|---|
ServerDeploymentNotFoundError |
If a deployment with the given name doesn't exist. |
Source code in zenml/zen_server/deploy/base_provider.py
def get_server(
self,
config: ServerDeploymentConfig,
) -> ServerDeployment:
"""Retrieve information about a ZenML server deployment.
Args:
config: The generic server deployment configuration.
Returns:
The server deployment.
Raises:
ServerDeploymentNotFoundError: If a deployment with the given name
doesn't exist.
"""
try:
service = self._get_service(config.name)
except KeyError:
raise ServerDeploymentNotFoundError(
f"ZenML server deployment with name '{config.name}' was not "
f"found"
)
return self._get_deployment(service)
get_server_logs(self, config, follow=False, tail=None)
Retrieve the logs of a ZenML server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServerDeploymentConfig |
The generic server deployment configuration. |
required |
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Exceptions:
Type | Description |
---|---|
ServerDeploymentNotFoundError |
If a deployment with the given name doesn't exist. |
Source code in zenml/zen_server/deploy/base_provider.py
def get_server_logs(
self,
config: ServerDeploymentConfig,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Retrieve the logs of a ZenML server.
Args:
config: The generic server deployment configuration.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
Raises:
ServerDeploymentNotFoundError: If a deployment with the given name
doesn't exist.
"""
try:
service = self._get_service(config.name)
except KeyError:
raise ServerDeploymentNotFoundError(
f"ZenML server deployment with name '{config.name}' was not "
f"found"
)
return service.get_logs(follow=follow, tail=tail)
list_servers(self)
List all server deployments managed by this provider.
Returns:
Type | Description |
---|---|
List[zenml.zen_server.deploy.deployment.ServerDeployment] |
The list of server deployments. |
Source code in zenml/zen_server/deploy/base_provider.py
def list_servers(self) -> List[ServerDeployment]:
"""List all server deployments managed by this provider.
Returns:
The list of server deployments.
"""
return [
self._get_deployment(service) for service in self._list_services()
]
register_as_provider()
classmethod
Register the class as a server provider.
Source code in zenml/zen_server/deploy/base_provider.py
@classmethod
def register_as_provider(cls) -> None:
"""Register the class as a server provider."""
from zenml.zen_server.deploy.deployer import ServerDeployer
ServerDeployer.register_provider(cls)
remove_server(self, config, timeout=None)
Tears down and removes all resources and files associated with a ZenML server deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServerDeploymentConfig |
The generic server deployment configuration. |
required |
timeout |
Optional[int] |
The timeout in seconds to wait until the server is removed. If not supplied, the default timeout value specified by the provider is used. |
None |
Exceptions:
Type | Description |
---|---|
ServerDeploymentNotFoundError |
If a deployment with the given name doesn't exist. |
Source code in zenml/zen_server/deploy/base_provider.py
def remove_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> None:
"""Tears down and removes all resources and files associated with a ZenML server deployment.
Args:
config: The generic server deployment configuration.
timeout: The timeout in seconds to wait until the server is
removed. If not supplied, the default timeout value specified
by the provider is used.
Raises:
ServerDeploymentNotFoundError: If a deployment with the given name
doesn't exist.
"""
try:
service = self._get_service(config.name)
except KeyError:
raise ServerDeploymentNotFoundError(
f"ZenML server deployment with name '{config.name}' was not "
f"found"
)
logger.info(f"Removing the {config.name} ZenML server.")
self._delete_service(service, timeout)
update_server(self, config, timeout=None)
Update an existing ZenML server deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServerDeploymentConfig |
The new generic server deployment configuration. |
required |
timeout |
Optional[int] |
The timeout in seconds to wait until the update is successful. If not supplied, the default timeout value specified by the provider is used. |
None |
Returns:
Type | Description |
---|---|
ServerDeployment |
The updated server deployment. |
Exceptions:
Type | Description |
---|---|
ServerDeploymentNotFoundError |
If a deployment with the given name doesn't exist. |
Source code in zenml/zen_server/deploy/base_provider.py
def update_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> ServerDeployment:
"""Update an existing ZenML server deployment.
Args:
config: The new generic server deployment configuration.
timeout: The timeout in seconds to wait until the update is
successful. If not supplied, the default timeout value specified
by the provider is used.
Returns:
The updated server deployment.
Raises:
ServerDeploymentNotFoundError: If a deployment with the given name
doesn't exist.
"""
try:
service = self._get_service(config.name)
except KeyError:
raise ServerDeploymentNotFoundError(
f"ZenML server deployment with name '{config.name}' was not "
f"found"
)
# convert the generic deployment config to a provider specific
# deployment config
config = self._convert_config(config)
old_config = self._get_deployment_config(service)
if old_config == config:
logger.info(
f"The {config.name} ZenML server is already configured with "
f"the same parameters."
)
service = self._start_service(service, timeout)
else:
logger.info(f"Updating the {config.name} ZenML server.")
service = self._update_service(service, config, timeout)
return self._get_deployment(service)
deployer
ZenML server deployer singleton implementation.
ServerDeployer
Server deployer singleton.
This class is responsible for managing the various server provider implementations and for directing server deployment lifecycle requests to the responsible provider. It acts as a facade built on top of the various server providers.
Source code in zenml/zen_server/deploy/deployer.py
class ServerDeployer(metaclass=SingletonMetaClass):
"""Server deployer singleton.
This class is responsible for managing the various server provider
implementations and for directing server deployment lifecycle requests to
the responsible provider. It acts as a facade built on top of the various
server providers.
"""
_providers: ClassVar[Dict[ServerProviderType, BaseServerProvider]] = {}
@classmethod
def register_provider(cls, provider: Type[BaseServerProvider]) -> None:
"""Register a server provider.
Args:
provider: The server provider to register.
Raises:
TypeError: If a provider with the same type is already registered.
"""
if provider.TYPE in cls._providers:
raise TypeError(
f"Server provider '{provider.TYPE}' is already registered."
)
logger.debug(f"Registering server provider '{provider.TYPE}'.")
cls._providers[provider.TYPE] = provider()
@classmethod
def get_provider(
cls, provider_type: ServerProviderType
) -> BaseServerProvider:
"""Get the server provider associated with a provider type.
Args:
provider_type: The server provider type.
Returns:
The server provider associated with the provider type.
Raises:
ServerProviderNotFoundError: If no provider is registered for the
given provider type.
"""
if provider_type not in cls._providers:
raise ServerProviderNotFoundError(
f"Server provider '{provider_type}' is not registered."
)
return cls._providers[provider_type]
def deploy_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> ServerDeployment:
"""Deploy a new ZenML server or update an existing deployment.
Args:
config: The server deployment configuration.
timeout: The timeout in seconds to wait until the deployment is
successful. If not supplied, the default timeout value specified
by the provider is used.
Returns:
The server deployment.
"""
# We do this here to ensure that the zenml store is always initialized
# before the server is deployed. This is necessary because the server
# may require access to the local store configuration or database.
gc = GlobalConfiguration()
if gc.store is None:
_ = gc.zen_store
try:
self.get_server(config.name)
except ServerDeploymentNotFoundError:
pass
else:
return self.update_server(config=config, timeout=timeout)
provider_name = config.provider.value
provider = self.get_provider(config.provider)
logger.info(
f"Deploying a {provider_name} ZenML server with name "
f"'{config.name}'."
)
return provider.deploy_server(config, timeout=timeout)
def update_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> ServerDeployment:
"""Update an existing ZenML server deployment.
Args:
config: The new server deployment configuration.
timeout: The timeout in seconds to wait until the deployment is
successful. If not supplied, a default timeout value of 30
seconds is used.
Returns:
The updated server deployment.
Raises:
ServerDeploymentExistsError: If an existing deployment with the same
name but a different provider type is found.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
existing_server = self.get_server(config.name)
provider = self.get_provider(config.provider)
existing_provider = existing_server.config.provider
if existing_provider != config.provider:
raise ServerDeploymentExistsError(
f"A server deployment with the same name '{config.name}' but "
f"with a different provider '{existing_provider.value}'."
f"is already provisioned. Please choose a different name or "
f"tear down the existing deployment."
)
return provider.update_server(config, timeout=timeout)
def remove_server(
self,
server_name: str,
timeout: Optional[int] = None,
) -> None:
"""Tears down and removes all resources and files associated with a ZenML server deployment.
Args:
server_name: The server deployment name.
timeout: The timeout in seconds to wait until the deployment is
successfully torn down. If not supplied, a provider specific
default timeout value is used.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
provider_name = server.config.provider.value
provider = self.get_provider(server.config.provider)
if self.is_connected_to_server(server_name):
self.disconnect_from_server(server_name)
logger.info(
f"Tearing down the '{server_name}' {provider_name} ZenML server."
)
provider.remove_server(server.config, timeout=timeout)
def is_connected_to_server(self, server_name: str) -> bool:
"""Check if the ZenML client is currently connected to a ZenML server.
Args:
server_name: The server deployment name.
Returns:
True if the ZenML client is connected to the ZenML server, False
otherwise.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
gc = GlobalConfiguration()
return (
server.status is not None
and server.status.url is not None
and gc.store is not None
and gc.store.url == server.status.url
)
def connect_to_server(
self,
server_name: str,
username: str,
password: str,
verify_ssl: Union[bool, str] = True,
) -> None:
"""Connect to a ZenML server instance.
Args:
server_name: The server deployment name.
username: The username to use to connect to the server.
password: The password to use to connect to the server.
verify_ssl: Either a boolean, in which case it controls whether we
verify the server's TLS certificate, or a string, in which case
it must be a path to a CA bundle to use or the CA bundle value
itself.
Raises:
ServerDeploymentError: If the ZenML server is not running or
is unreachable.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
provider_name = server.config.provider.value
gc = GlobalConfiguration()
if not server.status or not server.status.url:
raise ServerDeploymentError(
f"The {provider_name} {server_name} ZenML "
f"server is not currently running or is unreachable."
)
store_config = RestZenStoreConfiguration(
url=server.status.url,
username=username,
password=password,
verify_ssl=verify_ssl,
)
if gc.store == store_config:
logger.info(
f"ZenML is already connected to the '{server_name}' "
f"{provider_name} ZenML server."
)
return
logger.info(
f"Connecting ZenML to the '{server_name}' "
f"{provider_name} ZenML server ({store_config.url})."
)
gc.set_store(store_config)
logger.info(
f"Connected ZenML to the '{server_name}' "
f"{provider_name} ZenML server ({store_config.url})."
)
def disconnect_from_server(
self,
server_name: Optional[str] = None,
) -> None:
"""Disconnect from a ZenML server instance.
Args:
server_name: The server deployment name. If supplied, the deployer
will check if the ZenML client is indeed connected to the server
and disconnect only if that is the case. Otherwise the deployer
will disconnect from any ZenML server.
"""
gc = GlobalConfiguration()
if not gc.store or gc.store.type != StoreType.REST:
logger.info("ZenML is not currently connected to a ZenML server.")
return
if server_name:
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
provider_name = server.config.provider.value
if not self.is_connected_to_server(server_name):
logger.info(
f"ZenML is not currently connected to the '{server_name}' "
f"{provider_name} ZenML server."
)
return
logger.info(
f"Disconnecting ZenML from the '{server_name}' "
f"{provider_name} ZenML server ({gc.store.url})."
)
else:
logger.info(
f"Disconnecting ZenML from the {gc.store.url} ZenML server."
)
gc.set_default_store()
logger.info("Disconnected ZenML from the ZenML server.")
def get_server(
self,
server_name: str,
) -> ServerDeployment:
"""Get a server deployment.
Args:
server_name: The server deployment name.
Returns:
The requested server deployment.
Raises:
ServerDeploymentNotFoundError: If no server deployment with the
given name is found.
"""
for provider in self._providers.values():
try:
return provider.get_server(
ServerDeploymentConfig(
name=server_name, provider=provider.TYPE
)
)
except ServerDeploymentNotFoundError:
pass
raise ServerDeploymentNotFoundError(
f"Server deployment '{server_name}' not found."
)
def list_servers(
self,
server_name: Optional[str] = None,
provider_type: Optional[ServerProviderType] = None,
) -> List[ServerDeployment]:
"""List all server deployments.
Args:
server_name: The server deployment name to filter by.
provider_type: The server provider type to filter by.
Returns:
The list of server deployments.
"""
providers: List[BaseServerProvider] = []
if provider_type:
providers = [self.get_provider(provider_type)]
else:
providers = list(self._providers.values())
servers: List[ServerDeployment] = []
for provider in providers:
if server_name:
try:
servers.append(
provider.get_server(
ServerDeploymentConfig(
name=server_name,
provider=provider.TYPE,
)
)
)
except ServerDeploymentNotFoundError:
pass
else:
servers.extend(provider.list_servers())
return servers
def get_server_logs(
self,
server_name: str,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Retrieve the logs of a ZenML server.
Args:
server_name: The server deployment name.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
provider_name = server.config.provider.value
provider = self.get_provider(server.config.provider)
logger.info(
f"Fetching logs from the '{server_name}' {provider_name} ZenML "
f"server..."
)
return provider.get_server_logs(server.config, follow=follow, tail=tail)
connect_to_server(self, server_name, username, password, verify_ssl=True)
Connect to a ZenML server instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_name |
str |
The server deployment name. |
required |
username |
str |
The username to use to connect to the server. |
required |
password |
str |
The password to use to connect to the server. |
required |
verify_ssl |
Union[bool, str] |
Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use or the CA bundle value itself. |
True |
Exceptions:
Type | Description |
---|---|
ServerDeploymentError |
If the ZenML server is not running or is unreachable. |
Source code in zenml/zen_server/deploy/deployer.py
def connect_to_server(
self,
server_name: str,
username: str,
password: str,
verify_ssl: Union[bool, str] = True,
) -> None:
"""Connect to a ZenML server instance.
Args:
server_name: The server deployment name.
username: The username to use to connect to the server.
password: The password to use to connect to the server.
verify_ssl: Either a boolean, in which case it controls whether we
verify the server's TLS certificate, or a string, in which case
it must be a path to a CA bundle to use or the CA bundle value
itself.
Raises:
ServerDeploymentError: If the ZenML server is not running or
is unreachable.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
provider_name = server.config.provider.value
gc = GlobalConfiguration()
if not server.status or not server.status.url:
raise ServerDeploymentError(
f"The {provider_name} {server_name} ZenML "
f"server is not currently running or is unreachable."
)
store_config = RestZenStoreConfiguration(
url=server.status.url,
username=username,
password=password,
verify_ssl=verify_ssl,
)
if gc.store == store_config:
logger.info(
f"ZenML is already connected to the '{server_name}' "
f"{provider_name} ZenML server."
)
return
logger.info(
f"Connecting ZenML to the '{server_name}' "
f"{provider_name} ZenML server ({store_config.url})."
)
gc.set_store(store_config)
logger.info(
f"Connected ZenML to the '{server_name}' "
f"{provider_name} ZenML server ({store_config.url})."
)
deploy_server(self, config, timeout=None)
Deploy a new ZenML server or update an existing deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServerDeploymentConfig |
The server deployment configuration. |
required |
timeout |
Optional[int] |
The timeout in seconds to wait until the deployment is successful. If not supplied, the default timeout value specified by the provider is used. |
None |
Returns:
Type | Description |
---|---|
ServerDeployment |
The server deployment. |
Source code in zenml/zen_server/deploy/deployer.py
def deploy_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> ServerDeployment:
"""Deploy a new ZenML server or update an existing deployment.
Args:
config: The server deployment configuration.
timeout: The timeout in seconds to wait until the deployment is
successful. If not supplied, the default timeout value specified
by the provider is used.
Returns:
The server deployment.
"""
# We do this here to ensure that the zenml store is always initialized
# before the server is deployed. This is necessary because the server
# may require access to the local store configuration or database.
gc = GlobalConfiguration()
if gc.store is None:
_ = gc.zen_store
try:
self.get_server(config.name)
except ServerDeploymentNotFoundError:
pass
else:
return self.update_server(config=config, timeout=timeout)
provider_name = config.provider.value
provider = self.get_provider(config.provider)
logger.info(
f"Deploying a {provider_name} ZenML server with name "
f"'{config.name}'."
)
return provider.deploy_server(config, timeout=timeout)
disconnect_from_server(self, server_name=None)
Disconnect from a ZenML server instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_name |
Optional[str] |
The server deployment name. If supplied, the deployer will check if the ZenML client is indeed connected to the server and disconnect only if that is the case. Otherwise the deployer will disconnect from any ZenML server. |
None |
Source code in zenml/zen_server/deploy/deployer.py
def disconnect_from_server(
self,
server_name: Optional[str] = None,
) -> None:
"""Disconnect from a ZenML server instance.
Args:
server_name: The server deployment name. If supplied, the deployer
will check if the ZenML client is indeed connected to the server
and disconnect only if that is the case. Otherwise the deployer
will disconnect from any ZenML server.
"""
gc = GlobalConfiguration()
if not gc.store or gc.store.type != StoreType.REST:
logger.info("ZenML is not currently connected to a ZenML server.")
return
if server_name:
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
provider_name = server.config.provider.value
if not self.is_connected_to_server(server_name):
logger.info(
f"ZenML is not currently connected to the '{server_name}' "
f"{provider_name} ZenML server."
)
return
logger.info(
f"Disconnecting ZenML from the '{server_name}' "
f"{provider_name} ZenML server ({gc.store.url})."
)
else:
logger.info(
f"Disconnecting ZenML from the {gc.store.url} ZenML server."
)
gc.set_default_store()
logger.info("Disconnected ZenML from the ZenML server.")
get_provider(provider_type)
classmethod
Get the server provider associated with a provider type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
provider_type |
ServerProviderType |
The server provider type. |
required |
Returns:
Type | Description |
---|---|
BaseServerProvider |
The server provider associated with the provider type. |
Exceptions:
Type | Description |
---|---|
ServerProviderNotFoundError |
If no provider is registered for the given provider type. |
Source code in zenml/zen_server/deploy/deployer.py
@classmethod
def get_provider(
cls, provider_type: ServerProviderType
) -> BaseServerProvider:
"""Get the server provider associated with a provider type.
Args:
provider_type: The server provider type.
Returns:
The server provider associated with the provider type.
Raises:
ServerProviderNotFoundError: If no provider is registered for the
given provider type.
"""
if provider_type not in cls._providers:
raise ServerProviderNotFoundError(
f"Server provider '{provider_type}' is not registered."
)
return cls._providers[provider_type]
get_server(self, server_name)
Get a server deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_name |
str |
The server deployment name. |
required |
Returns:
Type | Description |
---|---|
ServerDeployment |
The requested server deployment. |
Exceptions:
Type | Description |
---|---|
ServerDeploymentNotFoundError |
If no server deployment with the given name is found. |
Source code in zenml/zen_server/deploy/deployer.py
def get_server(
self,
server_name: str,
) -> ServerDeployment:
"""Get a server deployment.
Args:
server_name: The server deployment name.
Returns:
The requested server deployment.
Raises:
ServerDeploymentNotFoundError: If no server deployment with the
given name is found.
"""
for provider in self._providers.values():
try:
return provider.get_server(
ServerDeploymentConfig(
name=server_name, provider=provider.TYPE
)
)
except ServerDeploymentNotFoundError:
pass
raise ServerDeploymentNotFoundError(
f"Server deployment '{server_name}' not found."
)
get_server_logs(self, server_name, follow=False, tail=None)
Retrieve the logs of a ZenML server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_name |
str |
The server deployment name. |
required |
follow |
bool |
if True, the logs will be streamed as they are written |
False |
tail |
Optional[int] |
only retrieve the last NUM lines of log output. |
None |
Returns:
Type | Description |
---|---|
Generator[str, bool, NoneType] |
A generator that can be accessed to get the service logs. |
Source code in zenml/zen_server/deploy/deployer.py
def get_server_logs(
self,
server_name: str,
follow: bool = False,
tail: Optional[int] = None,
) -> Generator[str, bool, None]:
"""Retrieve the logs of a ZenML server.
Args:
server_name: The server deployment name.
follow: if True, the logs will be streamed as they are written
tail: only retrieve the last NUM lines of log output.
Returns:
A generator that can be accessed to get the service logs.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
provider_name = server.config.provider.value
provider = self.get_provider(server.config.provider)
logger.info(
f"Fetching logs from the '{server_name}' {provider_name} ZenML "
f"server..."
)
return provider.get_server_logs(server.config, follow=follow, tail=tail)
is_connected_to_server(self, server_name)
Check if the ZenML client is currently connected to a ZenML server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_name |
str |
The server deployment name. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the ZenML client is connected to the ZenML server, False otherwise. |
Source code in zenml/zen_server/deploy/deployer.py
def is_connected_to_server(self, server_name: str) -> bool:
"""Check if the ZenML client is currently connected to a ZenML server.
Args:
server_name: The server deployment name.
Returns:
True if the ZenML client is connected to the ZenML server, False
otherwise.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
gc = GlobalConfiguration()
return (
server.status is not None
and server.status.url is not None
and gc.store is not None
and gc.store.url == server.status.url
)
list_servers(self, server_name=None, provider_type=None)
List all server deployments.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_name |
Optional[str] |
The server deployment name to filter by. |
None |
provider_type |
Optional[zenml.enums.ServerProviderType] |
The server provider type to filter by. |
None |
Returns:
Type | Description |
---|---|
List[zenml.zen_server.deploy.deployment.ServerDeployment] |
The list of server deployments. |
Source code in zenml/zen_server/deploy/deployer.py
def list_servers(
self,
server_name: Optional[str] = None,
provider_type: Optional[ServerProviderType] = None,
) -> List[ServerDeployment]:
"""List all server deployments.
Args:
server_name: The server deployment name to filter by.
provider_type: The server provider type to filter by.
Returns:
The list of server deployments.
"""
providers: List[BaseServerProvider] = []
if provider_type:
providers = [self.get_provider(provider_type)]
else:
providers = list(self._providers.values())
servers: List[ServerDeployment] = []
for provider in providers:
if server_name:
try:
servers.append(
provider.get_server(
ServerDeploymentConfig(
name=server_name,
provider=provider.TYPE,
)
)
)
except ServerDeploymentNotFoundError:
pass
else:
servers.extend(provider.list_servers())
return servers
register_provider(provider)
classmethod
Register a server provider.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
provider |
Type[zenml.zen_server.deploy.base_provider.BaseServerProvider] |
The server provider to register. |
required |
Exceptions:
Type | Description |
---|---|
TypeError |
If a provider with the same type is already registered. |
Source code in zenml/zen_server/deploy/deployer.py
@classmethod
def register_provider(cls, provider: Type[BaseServerProvider]) -> None:
"""Register a server provider.
Args:
provider: The server provider to register.
Raises:
TypeError: If a provider with the same type is already registered.
"""
if provider.TYPE in cls._providers:
raise TypeError(
f"Server provider '{provider.TYPE}' is already registered."
)
logger.debug(f"Registering server provider '{provider.TYPE}'.")
cls._providers[provider.TYPE] = provider()
remove_server(self, server_name, timeout=None)
Tears down and removes all resources and files associated with a ZenML server deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
server_name |
str |
The server deployment name. |
required |
timeout |
Optional[int] |
The timeout in seconds to wait until the deployment is successfully torn down. If not supplied, a provider specific default timeout value is used. |
None |
Source code in zenml/zen_server/deploy/deployer.py
def remove_server(
self,
server_name: str,
timeout: Optional[int] = None,
) -> None:
"""Tears down and removes all resources and files associated with a ZenML server deployment.
Args:
server_name: The server deployment name.
timeout: The timeout in seconds to wait until the deployment is
successfully torn down. If not supplied, a provider specific
default timeout value is used.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
server = self.get_server(server_name)
provider_name = server.config.provider.value
provider = self.get_provider(server.config.provider)
if self.is_connected_to_server(server_name):
self.disconnect_from_server(server_name)
logger.info(
f"Tearing down the '{server_name}' {provider_name} ZenML server."
)
provider.remove_server(server.config, timeout=timeout)
update_server(self, config, timeout=None)
Update an existing ZenML server deployment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ServerDeploymentConfig |
The new server deployment configuration. |
required |
timeout |
Optional[int] |
The timeout in seconds to wait until the deployment is successful. If not supplied, a default timeout value of 30 seconds is used. |
None |
Returns:
Type | Description |
---|---|
ServerDeployment |
The updated server deployment. |
Exceptions:
Type | Description |
---|---|
ServerDeploymentExistsError |
If an existing deployment with the same name but a different provider type is found. |
Source code in zenml/zen_server/deploy/deployer.py
def update_server(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> ServerDeployment:
"""Update an existing ZenML server deployment.
Args:
config: The new server deployment configuration.
timeout: The timeout in seconds to wait until the deployment is
successful. If not supplied, a default timeout value of 30
seconds is used.
Returns:
The updated server deployment.
Raises:
ServerDeploymentExistsError: If an existing deployment with the same
name but a different provider type is found.
"""
# this will also raise ServerDeploymentNotFoundError if the server
# does not exist
existing_server = self.get_server(config.name)
provider = self.get_provider(config.provider)
existing_provider = existing_server.config.provider
if existing_provider != config.provider:
raise ServerDeploymentExistsError(
f"A server deployment with the same name '{config.name}' but "
f"with a different provider '{existing_provider.value}'."
f"is already provisioned. Please choose a different name or "
f"tear down the existing deployment."
)
return provider.update_server(config, timeout=timeout)
deployment
Zen Server deployment definitions.
ServerDeployment (BaseModel)
pydantic-model
Server deployment.
Attributes:
Name | Type | Description |
---|---|---|
config |
ServerDeploymentConfig |
The server deployment configuration. |
status |
Optional[zenml.zen_server.deploy.deployment.ServerDeploymentStatus] |
The server deployment status. |
Source code in zenml/zen_server/deploy/deployment.py
class ServerDeployment(BaseModel):
"""Server deployment.
Attributes:
config: The server deployment configuration.
status: The server deployment status.
"""
config: ServerDeploymentConfig
status: Optional[ServerDeploymentStatus]
ServerDeploymentConfig (BaseModel)
pydantic-model
Generic server deployment configuration.
All server deployment configurations should inherit from this class and handle extra attributes as provider specific attributes.
Attributes:
Name | Type | Description |
---|---|---|
name |
str |
Name of the server deployment. |
provider |
ServerProviderType |
The server provider type. |
Source code in zenml/zen_server/deploy/deployment.py
class ServerDeploymentConfig(BaseModel):
"""Generic server deployment configuration.
All server deployment configurations should inherit from this class and
handle extra attributes as provider specific attributes.
Attributes:
name: Name of the server deployment.
provider: The server provider type.
"""
name: str
provider: ServerProviderType
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Allow extra attributes to be set in the base class. The concrete
# classes are responsible for validating the attributes.
extra = "allow"
Config
Pydantic configuration class.
Source code in zenml/zen_server/deploy/deployment.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Allow extra attributes to be set in the base class. The concrete
# classes are responsible for validating the attributes.
extra = "allow"
ServerDeploymentStatus (BaseModel)
pydantic-model
Server deployment status.
Ideally this should convey the following information:
- whether the server's deployment is managed by this client (i.e. if
the server was deployed with
zenml up
) - for a managed deployment, the status of the deployment/tear-down, e.g. not deployed, deploying, running, deleting, deployment timeout/error, tear-down timeout/error etc.
- for an unmanaged deployment, the operational status (i.e. whether the server is reachable)
- the URL of the server
Attributes:
Name | Type | Description |
---|---|---|
status |
ServiceState |
The status of the server deployment. |
status_message |
Optional[str] |
A message describing the last status. |
connected |
bool |
Whether the client is currently connected to this server. |
url |
Optional[str] |
The URL of the server. |
Source code in zenml/zen_server/deploy/deployment.py
class ServerDeploymentStatus(BaseModel):
"""Server deployment status.
Ideally this should convey the following information:
* whether the server's deployment is managed by this client (i.e. if
the server was deployed with `zenml up`)
* for a managed deployment, the status of the deployment/tear-down, e.g.
not deployed, deploying, running, deleting, deployment timeout/error,
tear-down timeout/error etc.
* for an unmanaged deployment, the operational status (i.e. whether the
server is reachable)
* the URL of the server
Attributes:
status: The status of the server deployment.
status_message: A message describing the last status.
connected: Whether the client is currently connected to this server.
url: The URL of the server.
"""
status: ServiceState
status_message: Optional[str] = None
connected: bool
url: Optional[str] = None
ca_crt: Optional[str] = None
docker
special
ZenML Server Docker Deployment.
docker_provider
Zen Server docker deployer implementation.
DockerServerProvider (BaseServerProvider)
Docker ZenML server provider.
Source code in zenml/zen_server/deploy/docker/docker_provider.py
class DockerServerProvider(BaseServerProvider):
"""Docker ZenML server provider."""
TYPE: ClassVar[ServerProviderType] = ServerProviderType.DOCKER
CONFIG_TYPE: ClassVar[
Type[ServerDeploymentConfig]
] = DockerServerDeploymentConfig
@classmethod
def _get_service_configuration(
cls,
server_config: ServerDeploymentConfig,
) -> Tuple[
ServiceConfig,
ServiceEndpointConfig,
ServiceEndpointHealthMonitorConfig,
]:
"""Construct the service configuration from a server deployment configuration.
Args:
server_config: server deployment configuration.
Returns:
The service, service endpoint and endpoint monitor configuration.
"""
assert isinstance(server_config, DockerServerDeploymentConfig)
return (
DockerZenServerConfig(
root_runtime_path=DockerZenServer.config_path(),
singleton=True,
image=server_config.image,
name=server_config.name,
server=server_config,
),
ContainerServiceEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
port=server_config.port,
allocate_port=False,
),
HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=ZEN_SERVER_HEALTHCHECK_URL_PATH,
use_head_request=True,
),
)
def _create_service(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> BaseService:
"""Create, start and return the docker ZenML server deployment service.
Args:
config: The server deployment configuration.
timeout: The timeout in seconds to wait until the service is
running.
Returns:
The service instance.
Raises:
RuntimeError: If a docker service is already running.
"""
assert isinstance(config, DockerServerDeploymentConfig)
if timeout is None:
timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT
service = DockerZenServer.get_service()
existing_service = DockerZenServer.get_service()
if existing_service:
raise RuntimeError(
f"A docker ZenML server with name '{existing_service.config.name}' "
f"is already running. Please stop it first before starting a "
f"new one."
)
(
service_config,
endpoint_cfg,
monitor_cfg,
) = self._get_service_configuration(config)
endpoint = ContainerServiceEndpoint(
config=endpoint_cfg,
monitor=HTTPEndpointHealthMonitor(
config=monitor_cfg,
),
)
service = DockerZenServer(config=service_config, endpoint=endpoint)
service.start(timeout=timeout)
return service
def _update_service(
self,
service: BaseService,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> BaseService:
"""Update the docker ZenML server deployment service.
Args:
service: The service instance.
config: The new server deployment configuration.
timeout: The timeout in seconds to wait until the updated service is
running.
Returns:
The updated service instance.
"""
if timeout is None:
timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT
(
new_config,
new_endpoint_cfg,
new_monitor_cfg,
) = self._get_service_configuration(config)
assert service.endpoint
assert service.endpoint.monitor
service.stop(timeout=timeout)
(
service.config,
service.endpoint.config,
service.endpoint.monitor.config,
) = (
new_config,
new_endpoint_cfg,
new_monitor_cfg,
)
service.start(timeout=timeout)
return service
def _start_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> BaseService:
"""Start the docker ZenML server deployment service.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
running.
Returns:
The updated service instance.
"""
if timeout is None:
timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT
service.start(timeout=timeout)
return service
def _stop_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> BaseService:
"""Stop the docker ZenML server deployment service.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
stopped.
Returns:
The updated service instance.
"""
if timeout is None:
timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT
service.stop(timeout=timeout)
return service
def _delete_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> None:
"""Remove the docker ZenML server deployment service.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
removed.
"""
assert isinstance(service, DockerZenServer)
if timeout is None:
timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT
service.stop(timeout)
shutil.rmtree(DockerZenServer.config_path())
def _get_service(self, server_name: str) -> BaseService:
"""Get the docker ZenML server deployment service.
Args:
server_name: The server deployment name.
Returns:
The service instance.
Raises:
KeyError: If the server deployment is not found.
"""
service = DockerZenServer.get_service()
if service is None:
raise KeyError("The docker ZenML server is not deployed.")
if service.config.name != server_name:
raise KeyError(
"The docker ZenML server is deployed but with a different name."
)
return service
def _list_services(self) -> List[BaseService]:
"""Get all service instances for all deployed ZenML servers.
Returns:
A list of service instances.
"""
service = DockerZenServer.get_service()
if service:
return [service]
return []
def _get_deployment_config(
self, service: BaseService
) -> ServerDeploymentConfig:
"""Recreate the server deployment configuration from a service instance.
Args:
service: The service instance.
Returns:
The server deployment configuration.
"""
server = cast(DockerZenServer, service)
return server.config.server
CONFIG_TYPE (ServerDeploymentConfig)
pydantic-model
Docker server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
port |
int |
The TCP port number where the server is accepting connections. |
image |
str |
The Docker image to use for the server. |
Source code in zenml/zen_server/deploy/docker/docker_provider.py
class DockerServerDeploymentConfig(ServerDeploymentConfig):
"""Docker server deployment configuration.
Attributes:
port: The TCP port number where the server is accepting connections.
image: The Docker image to use for the server.
"""
port: int = 8238
image: str = DOCKER_ZENML_SERVER_DEFAULT_IMAGE
store: Optional[StoreConfiguration] = None
class Config:
"""Pydantic configuration."""
extra = "forbid"
Config
Pydantic configuration.
Source code in zenml/zen_server/deploy/docker/docker_provider.py
class Config:
"""Pydantic configuration."""
extra = "forbid"
docker_zen_server
Service implementation for the ZenML docker server deployment.
DockerServerDeploymentConfig (ServerDeploymentConfig)
pydantic-model
Docker server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
port |
int |
The TCP port number where the server is accepting connections. |
image |
str |
The Docker image to use for the server. |
Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
class DockerServerDeploymentConfig(ServerDeploymentConfig):
"""Docker server deployment configuration.
Attributes:
port: The TCP port number where the server is accepting connections.
image: The Docker image to use for the server.
"""
port: int = 8238
image: str = DOCKER_ZENML_SERVER_DEFAULT_IMAGE
store: Optional[StoreConfiguration] = None
class Config:
"""Pydantic configuration."""
extra = "forbid"
Config
Pydantic configuration.
Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
class Config:
"""Pydantic configuration."""
extra = "forbid"
DockerZenServer (ContainerService)
pydantic-model
Service that can be used to start a docker ZenServer.
Attributes:
Name | Type | Description |
---|---|---|
config |
DockerZenServerConfig |
service configuration |
endpoint |
ContainerServiceEndpoint |
service endpoint |
Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
class DockerZenServer(ContainerService):
"""Service that can be used to start a docker ZenServer.
Attributes:
config: service configuration
endpoint: service endpoint
"""
SERVICE_TYPE = ServiceType(
name="docker_zenml_server",
type="zen_server",
flavor="docker",
description="Docker ZenML server deployment",
)
config: DockerZenServerConfig
endpoint: ContainerServiceEndpoint
@classmethod
def config_path(cls) -> str:
"""Path to the directory where the docker ZenML server files are located.
Returns:
Path to the docker ZenML server runtime directory.
"""
return os.path.join(
get_global_config_directory(),
"zen_server",
"docker",
)
@property
def _global_config_path(self) -> str:
"""Path to the global configuration directory used by this server.
Returns:
Path to the global configuration directory used by this server.
"""
return os.path.join(
self.config_path(), SERVICE_CONTAINER_GLOBAL_CONFIG_DIR
)
def _copy_global_configuration(self) -> None:
"""Copy the global configuration to the docker ZenML server location.
The docker ZenML server global configuration is a copy of the docker
global configuration. If a store configuration is explicitly set in
the server configuration, it will be used. Otherwise, the store
configuration is set to point to the local store.
"""
gc = GlobalConfiguration()
# this creates a copy of the global configuration and saves it to the
# server configuration path. The store is set to where the default local
# store is mounted in the docker container unless a custom store
# configuration is explicitly supplied with the server configuration.
gc.copy_configuration(
config_path=self._global_config_path,
store_config=self.config.server.store,
empty_store=self.config.server.store is None,
)
@classmethod
def get_service(cls) -> Optional["DockerZenServer"]:
"""Load and return the docker ZenML server service, if present.
Returns:
The docker ZenML server service or None, if the docker server
deployment is not found.
"""
from zenml.services import ServiceRegistry
config_filename = os.path.join(cls.config_path(), "service.json")
try:
with open(config_filename, "r") as f:
return cast(
DockerZenServer,
ServiceRegistry().load_service_from_json(f.read()),
)
except FileNotFoundError:
return None
def _get_container_cmd(self) -> Tuple[List[str], Dict[str, str]]:
"""Get the command to run the service container.
Override the inherited method to use a ZenML global config path inside
the container that points to the global config copy instead of the
one mounted from the local host.
Returns:
Command needed to launch the docker container and the environment
variables to set, in the formats accepted by subprocess.Popen.
"""
GlobalConfiguration()
cmd, env = super()._get_container_cmd()
env[ENV_ZENML_CONFIG_PATH] = os.path.join(
SERVICE_CONTAINER_PATH,
SERVICE_CONTAINER_GLOBAL_CONFIG_DIR,
)
env[ENV_ZENML_SERVER_DEPLOYMENT_TYPE] = ServerDeploymentType.DOCKER
# Set the local stores path to point to where the client's local stores
# path is mounted in the container. This ensures that the server's store
# configuration is initialized with the same path as the client.
env[ENV_ZENML_LOCAL_STORES_PATH] = os.path.join(
SERVICE_CONTAINER_GLOBAL_CONFIG_PATH,
LOCAL_STORES_DIRECTORY_NAME,
)
env[ENV_ZENML_DISABLE_DATABASE_MIGRATION] = "True"
return cmd, env
def provision(self) -> None:
"""Provision the service."""
self._copy_global_configuration()
super().provision()
def run(self) -> None:
"""Run the ZenML Server.
Raises:
ValueError: if started with a global configuration that connects to
another ZenML server.
"""
import uvicorn # type: ignore[import]
gc = GlobalConfiguration()
if gc.store and gc.store.type == StoreType.REST:
raise ValueError(
"The ZenML server cannot be started with REST store type."
)
logger.info(
"Starting ZenML Server as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
try:
uvicorn.run(
ZEN_SERVER_ENTRYPOINT,
host="0.0.0.0",
port=self.endpoint.config.port,
log_level="info",
)
except KeyboardInterrupt:
logger.info("ZenML Server stopped. Resuming normal execution.")
config_path()
classmethod
Path to the directory where the docker ZenML server files are located.
Returns:
Type | Description |
---|---|
str |
Path to the docker ZenML server runtime directory. |
Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
@classmethod
def config_path(cls) -> str:
"""Path to the directory where the docker ZenML server files are located.
Returns:
Path to the docker ZenML server runtime directory.
"""
return os.path.join(
get_global_config_directory(),
"zen_server",
"docker",
)
get_service()
classmethod
Load and return the docker ZenML server service, if present.
Returns:
Type | Description |
---|---|
Optional[DockerZenServer] |
The docker ZenML server service or None, if the docker server deployment is not found. |
Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
@classmethod
def get_service(cls) -> Optional["DockerZenServer"]:
"""Load and return the docker ZenML server service, if present.
Returns:
The docker ZenML server service or None, if the docker server
deployment is not found.
"""
from zenml.services import ServiceRegistry
config_filename = os.path.join(cls.config_path(), "service.json")
try:
with open(config_filename, "r") as f:
return cast(
DockerZenServer,
ServiceRegistry().load_service_from_json(f.read()),
)
except FileNotFoundError:
return None
provision(self)
Provision the service.
Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
def provision(self) -> None:
"""Provision the service."""
self._copy_global_configuration()
super().provision()
run(self)
Run the ZenML Server.
Exceptions:
Type | Description |
---|---|
ValueError |
if started with a global configuration that connects to another ZenML server. |
Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
def run(self) -> None:
"""Run the ZenML Server.
Raises:
ValueError: if started with a global configuration that connects to
another ZenML server.
"""
import uvicorn # type: ignore[import]
gc = GlobalConfiguration()
if gc.store and gc.store.type == StoreType.REST:
raise ValueError(
"The ZenML server cannot be started with REST store type."
)
logger.info(
"Starting ZenML Server as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
try:
uvicorn.run(
ZEN_SERVER_ENTRYPOINT,
host="0.0.0.0",
port=self.endpoint.config.port,
log_level="info",
)
except KeyboardInterrupt:
logger.info("ZenML Server stopped. Resuming normal execution.")
DockerZenServerConfig (ContainerServiceConfig)
pydantic-model
Docker Zen server configuration.
Attributes:
Name | Type | Description |
---|---|---|
server |
DockerServerDeploymentConfig |
The deployment configuration. |
Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
class DockerZenServerConfig(ContainerServiceConfig):
"""Docker Zen server configuration.
Attributes:
server: The deployment configuration.
"""
server: DockerServerDeploymentConfig
exceptions
ZenML server deployment exceptions.
ServerDeploymentConfigurationError (ServerDeploymentError)
Raised when there is a ZenML server deployment configuration error .
Source code in zenml/zen_server/deploy/exceptions.py
class ServerDeploymentConfigurationError(ServerDeploymentError):
"""Raised when there is a ZenML server deployment configuration error ."""
ServerDeploymentError (ZenMLBaseException)
Base exception class for all ZenML server deployment related errors.
Source code in zenml/zen_server/deploy/exceptions.py
class ServerDeploymentError(ZenMLBaseException):
"""Base exception class for all ZenML server deployment related errors."""
ServerDeploymentExistsError (ServerDeploymentError)
Raised when trying to deploy a new ZenML server with the same name.
Source code in zenml/zen_server/deploy/exceptions.py
class ServerDeploymentExistsError(ServerDeploymentError):
"""Raised when trying to deploy a new ZenML server with the same name."""
ServerDeploymentNotFoundError (ServerDeploymentError)
Raised when trying to fetch a ZenML server deployment that doesn't exist.
Source code in zenml/zen_server/deploy/exceptions.py
class ServerDeploymentNotFoundError(ServerDeploymentError):
"""Raised when trying to fetch a ZenML server deployment that doesn't exist."""
ServerProviderNotFoundError (ServerDeploymentError)
Raised when using a ZenML server provider that doesn't exist.
Source code in zenml/zen_server/deploy/exceptions.py
class ServerProviderNotFoundError(ServerDeploymentError):
"""Raised when using a ZenML server provider that doesn't exist."""
local
special
ZenML Server Local Deployment.
local_provider
Zen Server local provider implementation.
LocalServerProvider (BaseServerProvider)
Local ZenML server provider.
Source code in zenml/zen_server/deploy/local/local_provider.py
class LocalServerProvider(BaseServerProvider):
"""Local ZenML server provider."""
TYPE: ClassVar[ServerProviderType] = ServerProviderType.LOCAL
CONFIG_TYPE: ClassVar[
Type[ServerDeploymentConfig]
] = LocalServerDeploymentConfig
@staticmethod
def check_local_server_dependencies() -> None:
"""Check if local server dependencies are installed.
Raises:
RuntimeError: If the dependencies are not installed.
"""
try:
# Make sure the ZenML Server dependencies are installed
import fastapi # noqa
import uvicorn # type: ignore[import] # noqa
except ImportError:
# Unable to import the ZenML Server dependencies.
raise RuntimeError(
"The local ZenML server provider is unavailable because the "
"ZenML server requirements seems to be unavailable on your machine. "
"This is probably because ZenML was installed without the optional "
"ZenML Server dependencies. To install the missing dependencies "
f'run `pip install "zenml[server]=={__version__}`".'
)
@classmethod
def _get_service_configuration(
cls,
server_config: ServerDeploymentConfig,
) -> Tuple[
ServiceConfig,
ServiceEndpointConfig,
ServiceEndpointHealthMonitorConfig,
]:
"""Construct the service configuration from a server deployment configuration.
Args:
server_config: server deployment configuration.
Returns:
The service, service endpoint and endpoint monitor configuration.
"""
assert isinstance(server_config, LocalServerDeploymentConfig)
return (
LocalZenServerConfig(
root_runtime_path=LocalZenServer.config_path(),
singleton=True,
name=server_config.name,
blocking=server_config.blocking,
server=server_config,
),
LocalDaemonServiceEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
ip_address=str(server_config.ip_address),
port=server_config.port,
allocate_port=False,
),
HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=ZEN_SERVER_HEALTHCHECK_URL_PATH,
use_head_request=True,
),
)
def _create_service(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> BaseService:
"""Create, start and return the local ZenML server deployment service.
Args:
config: The server deployment configuration.
timeout: The timeout in seconds to wait until the service is
running.
Returns:
The service instance.
Raises:
RuntimeError: If a local service is already running.
"""
assert isinstance(config, LocalServerDeploymentConfig)
if timeout is None:
timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT
self.check_local_server_dependencies()
existing_service = LocalZenServer.get_service()
if existing_service:
raise RuntimeError(
f"A local ZenML server with name '{existing_service.config.name}' "
f"is already running. Please stop it first before starting a "
f"new one."
)
(
service_config,
endpoint_cfg,
monitor_cfg,
) = self._get_service_configuration(config)
endpoint = LocalDaemonServiceEndpoint(
config=endpoint_cfg,
monitor=HTTPEndpointHealthMonitor(
config=monitor_cfg,
),
)
service = LocalZenServer(config=service_config, endpoint=endpoint)
service.start(timeout=timeout)
return service
def _update_service(
self,
service: BaseService,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> BaseService:
"""Update the local ZenML server deployment service.
Args:
service: The service instance.
config: The new server deployment configuration.
timeout: The timeout in seconds to wait until the updated service is
running.
Returns:
The updated service instance.
"""
if timeout is None:
timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT
(
new_config,
new_endpoint_cfg,
new_monitor_cfg,
) = self._get_service_configuration(config)
assert service.endpoint
assert service.endpoint.monitor
service.stop(timeout=timeout)
(
service.config,
service.endpoint.config,
service.endpoint.monitor.config,
) = (
new_config,
new_endpoint_cfg,
new_monitor_cfg,
)
service.start(timeout=timeout)
return service
def _start_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> BaseService:
"""Start the local ZenML server deployment service.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
running.
Returns:
The updated service instance.
"""
if timeout is None:
timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT
service.start(timeout=timeout)
return service
def _stop_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> BaseService:
"""Stop the local ZenML server deployment service.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
stopped.
Returns:
The updated service instance.
"""
if timeout is None:
timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT
service.stop(timeout=timeout)
return service
def _delete_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> None:
"""Remove the local ZenML server deployment service.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
removed.
"""
assert isinstance(service, LocalZenServer)
if timeout is None:
timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT
service.stop(timeout)
shutil.rmtree(LocalZenServer.config_path())
def _get_service(self, server_name: str) -> BaseService:
"""Get the local ZenML server deployment service.
Args:
server_name: The server deployment name.
Returns:
The service instance.
Raises:
KeyError: If the server deployment is not found.
"""
service = LocalZenServer.get_service()
if service is None:
raise KeyError("The local ZenML server is not deployed.")
if service.config.name != server_name:
raise KeyError(
"The local ZenML server is deployed but with a different name."
)
return service
def _list_services(self) -> List[BaseService]:
"""Get all service instances for all deployed ZenML servers.
Returns:
A list of service instances.
"""
service = LocalZenServer.get_service()
if service:
return [service]
return []
def _get_deployment_config(
self, service: BaseService
) -> ServerDeploymentConfig:
"""Recreate the server deployment configuration from a service instance.
Args:
service: The service instance.
Returns:
The server deployment configuration.
"""
server = cast(LocalZenServer, service)
return server.config.server
CONFIG_TYPE (ServerDeploymentConfig)
pydantic-model
Local server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
port |
int |
The TCP port number where the server is accepting connections. |
address |
The IP address where the server is reachable. |
|
blocking |
bool |
Run the server in blocking mode instead of using a daemon process. |
Source code in zenml/zen_server/deploy/local/local_provider.py
class LocalServerDeploymentConfig(ServerDeploymentConfig):
"""Local server deployment configuration.
Attributes:
port: The TCP port number where the server is accepting connections.
address: The IP address where the server is reachable.
blocking: Run the server in blocking mode instead of using a daemon
process.
"""
port: int = 8237
ip_address: Union[
ipaddress.IPv4Address, ipaddress.IPv6Address
] = ipaddress.IPv4Address(DEFAULT_LOCAL_SERVICE_IP_ADDRESS)
blocking: bool = False
store: Optional[StoreConfiguration] = None
class Config:
"""Pydantic configuration."""
extra = "forbid"
Config
Pydantic configuration.
Source code in zenml/zen_server/deploy/local/local_provider.py
class Config:
"""Pydantic configuration."""
extra = "forbid"
check_local_server_dependencies()
staticmethod
Check if local server dependencies are installed.
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the dependencies are not installed. |
Source code in zenml/zen_server/deploy/local/local_provider.py
@staticmethod
def check_local_server_dependencies() -> None:
"""Check if local server dependencies are installed.
Raises:
RuntimeError: If the dependencies are not installed.
"""
try:
# Make sure the ZenML Server dependencies are installed
import fastapi # noqa
import uvicorn # type: ignore[import] # noqa
except ImportError:
# Unable to import the ZenML Server dependencies.
raise RuntimeError(
"The local ZenML server provider is unavailable because the "
"ZenML server requirements seems to be unavailable on your machine. "
"This is probably because ZenML was installed without the optional "
"ZenML Server dependencies. To install the missing dependencies "
f'run `pip install "zenml[server]=={__version__}`".'
)
local_zen_server
Local ZenML server deployment service implementation.
LocalServerDeploymentConfig (ServerDeploymentConfig)
pydantic-model
Local server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
port |
int |
The TCP port number where the server is accepting connections. |
address |
The IP address where the server is reachable. |
|
blocking |
bool |
Run the server in blocking mode instead of using a daemon process. |
Source code in zenml/zen_server/deploy/local/local_zen_server.py
class LocalServerDeploymentConfig(ServerDeploymentConfig):
"""Local server deployment configuration.
Attributes:
port: The TCP port number where the server is accepting connections.
address: The IP address where the server is reachable.
blocking: Run the server in blocking mode instead of using a daemon
process.
"""
port: int = 8237
ip_address: Union[
ipaddress.IPv4Address, ipaddress.IPv6Address
] = ipaddress.IPv4Address(DEFAULT_LOCAL_SERVICE_IP_ADDRESS)
blocking: bool = False
store: Optional[StoreConfiguration] = None
class Config:
"""Pydantic configuration."""
extra = "forbid"
Config
Pydantic configuration.
Source code in zenml/zen_server/deploy/local/local_zen_server.py
class Config:
"""Pydantic configuration."""
extra = "forbid"
LocalZenServer (LocalDaemonService)
pydantic-model
Service daemon that can be used to start a local ZenML Server.
Attributes:
Name | Type | Description |
---|---|---|
config |
LocalZenServerConfig |
service configuration |
endpoint |
LocalDaemonServiceEndpoint |
optional service endpoint |
Source code in zenml/zen_server/deploy/local/local_zen_server.py
class LocalZenServer(LocalDaemonService):
"""Service daemon that can be used to start a local ZenML Server.
Attributes:
config: service configuration
endpoint: optional service endpoint
"""
SERVICE_TYPE = ServiceType(
name="local_zenml_server",
type="zen_server",
flavor="local",
description="Local ZenML server deployment",
)
config: LocalZenServerConfig
endpoint: LocalDaemonServiceEndpoint
@classmethod
def config_path(cls) -> str:
"""Path to the directory where the local ZenML server files are located.
Returns:
Path to the local ZenML server runtime directory.
"""
return os.path.join(
get_global_config_directory(),
"zen_server",
"local",
)
@property
def _global_config_path(self) -> str:
"""Path to the global configuration directory used by this server.
Returns:
Path to the global configuration directory used by this server.
"""
return os.path.join(self.config_path(), ".zenconfig")
def _copy_global_configuration(self) -> None:
"""Copy the global configuration to the local ZenML server location.
The local ZenML server global configuration is a copy of the local
global configuration. If a store configuration is explicitly set in
the server configuration, it will be used. Otherwise, the store
configuration is set to point to the local store.
"""
gc = GlobalConfiguration()
# this creates a copy of the global configuration and saves it to
# the server configuration path. The store is set to point to the local
# default database unless a custom store configuration is explicitly
# supplied with the server configuration.
gc.copy_configuration(
config_path=self._global_config_path,
store_config=self.config.server.store,
empty_store=self.config.server.store is None,
)
@classmethod
def get_service(cls) -> Optional["LocalZenServer"]:
"""Load and return the local ZenML server service, if present.
Returns:
The local ZenML server service or None, if the local server
deployment is not found.
"""
from zenml.services import ServiceRegistry
config_filename = os.path.join(cls.config_path(), "service.json")
try:
with open(config_filename, "r") as f:
return cast(
LocalZenServer,
ServiceRegistry().load_service_from_json(f.read()),
)
except FileNotFoundError:
return None
def _get_daemon_cmd(self) -> Tuple[List[str], Dict[str, str]]:
"""Get the command to start the daemon.
Overrides the base class implementation to add the environment variable
that forces the ZenML server to use the copied global config.
Returns:
The command to start the daemon and the environment variables to
set for the command.
"""
cmd, env = super()._get_daemon_cmd()
env[ENV_ZENML_CONFIG_PATH] = self._global_config_path
env[ENV_ZENML_SERVER_DEPLOYMENT_TYPE] = ServerDeploymentType.LOCAL
# Set the local stores path to the same path used by the client. This
# ensures that the server's store configuration is initialized with
# the same path as the client.
env[
ENV_ZENML_LOCAL_STORES_PATH
] = GlobalConfiguration().local_stores_path
env[ENV_ZENML_DISABLE_DATABASE_MIGRATION] = "True"
return cmd, env
def provision(self) -> None:
"""Provision the service."""
self._copy_global_configuration()
super().provision()
def start(self, timeout: int = 0) -> None:
"""Start the service and optionally wait for it to become active.
Args:
timeout: amount of time to wait for the service to become active.
If set to 0, the method will return immediately after checking
the service status.
"""
if not self.config.blocking:
super().start(timeout)
else:
self._copy_global_configuration()
local_stores_path = GlobalConfiguration().local_stores_path
GlobalConfiguration._reset_instance()
Client._reset_instance()
config_path = os.environ.get(ENV_ZENML_CONFIG_PATH)
os.environ[ENV_ZENML_CONFIG_PATH] = self._global_config_path
os.environ[ENV_ZENML_LOCAL_STORES_PATH] = local_stores_path
try:
self.run()
finally:
if config_path:
os.environ[ENV_ZENML_CONFIG_PATH] = config_path
else:
del os.environ[ENV_ZENML_CONFIG_PATH]
del os.environ[ENV_ZENML_LOCAL_STORES_PATH]
GlobalConfiguration._reset_instance()
Client._reset_instance()
def run(self) -> None:
"""Run the ZenML Server.
Raises:
ValueError: if started with a global configuration that connects to
another ZenML server.
"""
import uvicorn # type: ignore[import]
gc = GlobalConfiguration()
if gc.store and gc.store.type == StoreType.REST:
raise ValueError(
"The ZenML server cannot be started with REST store type."
)
logger.info(
"Starting ZenML Server as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
try:
uvicorn.run(
ZEN_SERVER_ENTRYPOINT,
host=self.endpoint.config.ip_address,
port=self.endpoint.config.port,
log_level="info",
)
except KeyboardInterrupt:
logger.info("ZenML Server stopped. Resuming normal execution.")
config_path()
classmethod
Path to the directory where the local ZenML server files are located.
Returns:
Type | Description |
---|---|
str |
Path to the local ZenML server runtime directory. |
Source code in zenml/zen_server/deploy/local/local_zen_server.py
@classmethod
def config_path(cls) -> str:
"""Path to the directory where the local ZenML server files are located.
Returns:
Path to the local ZenML server runtime directory.
"""
return os.path.join(
get_global_config_directory(),
"zen_server",
"local",
)
get_service()
classmethod
Load and return the local ZenML server service, if present.
Returns:
Type | Description |
---|---|
Optional[LocalZenServer] |
The local ZenML server service or None, if the local server deployment is not found. |
Source code in zenml/zen_server/deploy/local/local_zen_server.py
@classmethod
def get_service(cls) -> Optional["LocalZenServer"]:
"""Load and return the local ZenML server service, if present.
Returns:
The local ZenML server service or None, if the local server
deployment is not found.
"""
from zenml.services import ServiceRegistry
config_filename = os.path.join(cls.config_path(), "service.json")
try:
with open(config_filename, "r") as f:
return cast(
LocalZenServer,
ServiceRegistry().load_service_from_json(f.read()),
)
except FileNotFoundError:
return None
provision(self)
Provision the service.
Source code in zenml/zen_server/deploy/local/local_zen_server.py
def provision(self) -> None:
"""Provision the service."""
self._copy_global_configuration()
super().provision()
run(self)
Run the ZenML Server.
Exceptions:
Type | Description |
---|---|
ValueError |
if started with a global configuration that connects to another ZenML server. |
Source code in zenml/zen_server/deploy/local/local_zen_server.py
def run(self) -> None:
"""Run the ZenML Server.
Raises:
ValueError: if started with a global configuration that connects to
another ZenML server.
"""
import uvicorn # type: ignore[import]
gc = GlobalConfiguration()
if gc.store and gc.store.type == StoreType.REST:
raise ValueError(
"The ZenML server cannot be started with REST store type."
)
logger.info(
"Starting ZenML Server as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
try:
uvicorn.run(
ZEN_SERVER_ENTRYPOINT,
host=self.endpoint.config.ip_address,
port=self.endpoint.config.port,
log_level="info",
)
except KeyboardInterrupt:
logger.info("ZenML Server stopped. Resuming normal execution.")
start(self, timeout=0)
Start the service and optionally wait for it to become active.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
timeout |
int |
amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status. |
0 |
Source code in zenml/zen_server/deploy/local/local_zen_server.py
def start(self, timeout: int = 0) -> None:
"""Start the service and optionally wait for it to become active.
Args:
timeout: amount of time to wait for the service to become active.
If set to 0, the method will return immediately after checking
the service status.
"""
if not self.config.blocking:
super().start(timeout)
else:
self._copy_global_configuration()
local_stores_path = GlobalConfiguration().local_stores_path
GlobalConfiguration._reset_instance()
Client._reset_instance()
config_path = os.environ.get(ENV_ZENML_CONFIG_PATH)
os.environ[ENV_ZENML_CONFIG_PATH] = self._global_config_path
os.environ[ENV_ZENML_LOCAL_STORES_PATH] = local_stores_path
try:
self.run()
finally:
if config_path:
os.environ[ENV_ZENML_CONFIG_PATH] = config_path
else:
del os.environ[ENV_ZENML_CONFIG_PATH]
del os.environ[ENV_ZENML_LOCAL_STORES_PATH]
GlobalConfiguration._reset_instance()
Client._reset_instance()
LocalZenServerConfig (LocalDaemonServiceConfig)
pydantic-model
Local Zen server configuration.
Attributes:
Name | Type | Description |
---|---|---|
server |
LocalServerDeploymentConfig |
The deployment configuration. |
Source code in zenml/zen_server/deploy/local/local_zen_server.py
class LocalZenServerConfig(LocalDaemonServiceConfig):
"""Local Zen server configuration.
Attributes:
server: The deployment configuration.
"""
server: LocalServerDeploymentConfig
terraform
special
ZenML Server Terraform Deployment.
providers
special
ZenML Server Terraform Providers.
aws_provider
Zen Server AWS Terraform deployer implementation.
AWSServerDeploymentConfig (TerraformServerDeploymentConfig)
pydantic-model
AWS server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
region |
str |
The AWS region to deploy to. |
rds_name |
str |
The name of the RDS instance to create |
db_name |
str |
Name of RDS database to create. |
db_type |
str |
Type of RDS database to create. |
db_version |
str |
Version of RDS database to create. |
db_instance_class |
str |
Instance class of RDS database to create. |
db_allocated_storage |
int |
Allocated storage of RDS database to create. |
Source code in zenml/zen_server/deploy/terraform/providers/aws_provider.py
class AWSServerDeploymentConfig(TerraformServerDeploymentConfig):
"""AWS server deployment configuration.
Attributes:
region: The AWS region to deploy to.
rds_name: The name of the RDS instance to create
db_name: Name of RDS database to create.
db_type: Type of RDS database to create.
db_version: Version of RDS database to create.
db_instance_class: Instance class of RDS database to create.
db_allocated_storage: Allocated storage of RDS database to create.
"""
region: str = "eu-west-1"
rds_name: str = "zenmlserver"
db_name: str = "zenmlserver"
db_type: str = "mysql"
db_version: str = "5.7.38"
db_instance_class: str = "db.t3.micro"
db_allocated_storage: int = 5
AWSServerProvider (TerraformServerProvider)
AWS ZenML server provider.
Source code in zenml/zen_server/deploy/terraform/providers/aws_provider.py
class AWSServerProvider(TerraformServerProvider):
"""AWS ZenML server provider."""
TYPE: ClassVar[ServerProviderType] = ServerProviderType.AWS
CONFIG_TYPE: ClassVar[
Type[TerraformServerDeploymentConfig]
] = AWSServerDeploymentConfig
CONFIG_TYPE (TerraformServerDeploymentConfig)
pydantic-model
AWS server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
region |
str |
The AWS region to deploy to. |
rds_name |
str |
The name of the RDS instance to create |
db_name |
str |
Name of RDS database to create. |
db_type |
str |
Type of RDS database to create. |
db_version |
str |
Version of RDS database to create. |
db_instance_class |
str |
Instance class of RDS database to create. |
db_allocated_storage |
int |
Allocated storage of RDS database to create. |
Source code in zenml/zen_server/deploy/terraform/providers/aws_provider.py
class AWSServerDeploymentConfig(TerraformServerDeploymentConfig):
"""AWS server deployment configuration.
Attributes:
region: The AWS region to deploy to.
rds_name: The name of the RDS instance to create
db_name: Name of RDS database to create.
db_type: Type of RDS database to create.
db_version: Version of RDS database to create.
db_instance_class: Instance class of RDS database to create.
db_allocated_storage: Allocated storage of RDS database to create.
"""
region: str = "eu-west-1"
rds_name: str = "zenmlserver"
db_name: str = "zenmlserver"
db_type: str = "mysql"
db_version: str = "5.7.38"
db_instance_class: str = "db.t3.micro"
db_allocated_storage: int = 5
azure_provider
Zen Server Azure Terraform deployer implementation.
AzureServerDeploymentConfig (TerraformServerDeploymentConfig)
pydantic-model
Azure server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
resource_group |
str |
The Azure resource_group to deploy to. |
db_instance_name |
str |
The name of the Flexible MySQL instance to create |
db_name |
str |
Name of RDS database to create. |
db_version |
str |
Version of MySQL database to create. |
db_sku_name |
str |
The sku_name for the database resource. |
db_disk_size |
int |
Allocated storage of MySQL database to create. |
Source code in zenml/zen_server/deploy/terraform/providers/azure_provider.py
class AzureServerDeploymentConfig(TerraformServerDeploymentConfig):
"""Azure server deployment configuration.
Attributes:
resource_group: The Azure resource_group to deploy to.
db_instance_name: The name of the Flexible MySQL instance to create
db_name: Name of RDS database to create.
db_version: Version of MySQL database to create.
db_sku_name: The sku_name for the database resource.
db_disk_size: Allocated storage of MySQL database to create.
"""
resource_group: str = "zenml"
db_instance_name: str = "zenmlserver"
db_name: str = "zenmlserver"
db_version: str = "5.7"
db_sku_name: str = "B_Standard_B1s"
db_disk_size: int = 20
AzureServerProvider (TerraformServerProvider)
Azure ZenML server provider.
Source code in zenml/zen_server/deploy/terraform/providers/azure_provider.py
class AzureServerProvider(TerraformServerProvider):
"""Azure ZenML server provider."""
TYPE: ClassVar[ServerProviderType] = ServerProviderType.AZURE
CONFIG_TYPE: ClassVar[
Type[TerraformServerDeploymentConfig]
] = AzureServerDeploymentConfig
CONFIG_TYPE (TerraformServerDeploymentConfig)
pydantic-model
Azure server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
resource_group |
str |
The Azure resource_group to deploy to. |
db_instance_name |
str |
The name of the Flexible MySQL instance to create |
db_name |
str |
Name of RDS database to create. |
db_version |
str |
Version of MySQL database to create. |
db_sku_name |
str |
The sku_name for the database resource. |
db_disk_size |
int |
Allocated storage of MySQL database to create. |
Source code in zenml/zen_server/deploy/terraform/providers/azure_provider.py
class AzureServerDeploymentConfig(TerraformServerDeploymentConfig):
"""Azure server deployment configuration.
Attributes:
resource_group: The Azure resource_group to deploy to.
db_instance_name: The name of the Flexible MySQL instance to create
db_name: Name of RDS database to create.
db_version: Version of MySQL database to create.
db_sku_name: The sku_name for the database resource.
db_disk_size: Allocated storage of MySQL database to create.
"""
resource_group: str = "zenml"
db_instance_name: str = "zenmlserver"
db_name: str = "zenmlserver"
db_version: str = "5.7"
db_sku_name: str = "B_Standard_B1s"
db_disk_size: int = 20
gcp_provider
Zen Server GCP Terraform deployer implementation.
GCPServerDeploymentConfig (TerraformServerDeploymentConfig)
pydantic-model
GCP server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
project_id |
str |
The project in GCP to deploy the server to. |
region |
str |
The GCP region to deploy to. |
cloudsql_name |
str |
The name of the CloudSQL instance to create |
db_name |
str |
Name of CloudSQL database to create. |
db_instance_tier |
str |
Instance class of CloudSQL database to create. |
db_disk_size |
int |
Allocated storage of CloudSQL database to create. |
Source code in zenml/zen_server/deploy/terraform/providers/gcp_provider.py
class GCPServerDeploymentConfig(TerraformServerDeploymentConfig):
"""GCP server deployment configuration.
Attributes:
project_id: The project in GCP to deploy the server to.
region: The GCP region to deploy to.
cloudsql_name: The name of the CloudSQL instance to create
db_name: Name of CloudSQL database to create.
db_instance_tier: Instance class of CloudSQL database to create.
db_disk_size: Allocated storage of CloudSQL database to create.
"""
project_id: str
region: str = "europe-west3"
cloudsql_name: str = "zenmlserver"
db_name: str = "zenmlserver"
db_instance_tier: str = "db-n1-standard-1"
db_disk_size: int = 10
GCPServerProvider (TerraformServerProvider)
GCP ZenML server provider.
Source code in zenml/zen_server/deploy/terraform/providers/gcp_provider.py
class GCPServerProvider(TerraformServerProvider):
"""GCP ZenML server provider."""
TYPE: ClassVar[ServerProviderType] = ServerProviderType.GCP
CONFIG_TYPE: ClassVar[
Type[TerraformServerDeploymentConfig]
] = GCPServerDeploymentConfig
CONFIG_TYPE (TerraformServerDeploymentConfig)
pydantic-model
GCP server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
project_id |
str |
The project in GCP to deploy the server to. |
region |
str |
The GCP region to deploy to. |
cloudsql_name |
str |
The name of the CloudSQL instance to create |
db_name |
str |
Name of CloudSQL database to create. |
db_instance_tier |
str |
Instance class of CloudSQL database to create. |
db_disk_size |
int |
Allocated storage of CloudSQL database to create. |
Source code in zenml/zen_server/deploy/terraform/providers/gcp_provider.py
class GCPServerDeploymentConfig(TerraformServerDeploymentConfig):
"""GCP server deployment configuration.
Attributes:
project_id: The project in GCP to deploy the server to.
region: The GCP region to deploy to.
cloudsql_name: The name of the CloudSQL instance to create
db_name: Name of CloudSQL database to create.
db_instance_tier: Instance class of CloudSQL database to create.
db_disk_size: Allocated storage of CloudSQL database to create.
"""
project_id: str
region: str = "europe-west3"
cloudsql_name: str = "zenmlserver"
db_name: str = "zenmlserver"
db_instance_tier: str = "db-n1-standard-1"
db_disk_size: int = 10
terraform_provider
Zen Server terraform deployer implementation.
TerraformServerProvider (BaseServerProvider)
Terraform ZenML server provider.
Source code in zenml/zen_server/deploy/terraform/providers/terraform_provider.py
class TerraformServerProvider(BaseServerProvider):
"""Terraform ZenML server provider."""
CONFIG_TYPE: ClassVar[
Type[ServerDeploymentConfig]
] = TerraformServerDeploymentConfig
@staticmethod
def _get_server_recipe_root_path() -> str:
"""Get the server recipe root path.
The Terraform recipe files for all terraform server providers are
located in a folder relative to the `zenml.zen_server.deploy.terraform`
Python module.
Returns:
The server recipe root path.
"""
import zenml.zen_server.deploy.terraform as terraform_module
root_path = os.path.join(
os.path.dirname(terraform_module.__file__),
TERRAFORM_ZENML_SERVER_RECIPE_SUBPATH,
)
return root_path
@classmethod
def _get_service_configuration(
cls,
server_config: ServerDeploymentConfig,
) -> Tuple[
ServiceConfig,
ServiceEndpointConfig,
ServiceEndpointHealthMonitorConfig,
]:
"""Construct the service configuration from a server deployment configuration.
Args:
server_config: server deployment configuration.
Returns:
The service configuration.
"""
assert isinstance(server_config, TerraformServerDeploymentConfig)
return (
TerraformZenServerConfig(
name=server_config.name,
root_runtime_path=TERRAFORM_ZENML_SERVER_CONFIG_PATH,
singleton=True,
directory_path=os.path.join(
cls._get_server_recipe_root_path(),
server_config.provider,
),
log_level=server_config.log_level,
variables_file_path=TERRAFORM_VALUES_FILE_PATH,
server=server_config,
),
ServiceEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
allocate_port=False,
),
HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=ZEN_SERVER_HEALTHCHECK_URL_PATH,
use_head_request=True,
),
)
def _create_service(
self,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> BaseService:
"""Create, start and return the terraform ZenML server deployment service.
Args:
config: The server deployment configuration.
timeout: The timeout in seconds to wait until the service is
running.
Returns:
The service instance.
Raises:
RuntimeError: If a terraform service is already running.
"""
assert isinstance(config, TerraformServerDeploymentConfig)
if timeout is None:
timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT
existing_service = TerraformZenServer.get_service()
if existing_service:
raise RuntimeError(
f"A terraform ZenML server with name '{existing_service.config.name}' "
f"is already running. Please stop it first before starting a "
f"new one."
)
(
service_config,
endpoint_cfg,
monitor_cfg,
) = self._get_service_configuration(config)
service = TerraformZenServer(config=service_config)
service.start(timeout=timeout)
return service
def _update_service(
self,
service: BaseService,
config: ServerDeploymentConfig,
timeout: Optional[int] = None,
) -> BaseService:
"""Update the terraform ZenML server deployment service.
Args:
service: The service instance.
config: The new server deployment configuration.
timeout: The timeout in seconds to wait until the updated service is
running.
Returns:
The updated service instance.
"""
if timeout is None:
timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT
(
new_config,
endpoint_cfg,
monitor_cfg,
) = self._get_service_configuration(config)
assert isinstance(new_config, TerraformZenServerConfig)
assert isinstance(service, TerraformZenServer)
# preserve the server ID across updates
new_config.server.server_id = service.config.server.server_id
service.config = new_config
service.start(timeout=timeout)
return service
def _start_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> BaseService:
"""Start the terraform ZenML server deployment service.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
running.
Returns:
The updated service instance.
"""
if timeout is None:
timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT
service.start(timeout=timeout)
return service
def _stop_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> BaseService:
"""Stop the terraform ZenML server deployment service.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
stopped.
Returns:
The updated service instance.
"""
if timeout is None:
timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT
service.stop(timeout=timeout)
return service
def _delete_service(
self,
service: BaseService,
timeout: Optional[int] = None,
) -> None:
"""Remove the terraform ZenML server deployment service.
Args:
service: The service instance.
timeout: The timeout in seconds to wait until the service is
removed.
"""
assert isinstance(service, TerraformZenServer)
if timeout is None:
timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT
service.stop(timeout)
def _get_service(self, server_name: str) -> BaseService:
"""Get the terraform ZenML server deployment service.
Args:
server_name: The server deployment name.
Returns:
The service instance.
Raises:
KeyError: If the server deployment is not found.
"""
service = TerraformZenServer.get_service()
if service is None:
raise KeyError("The terraform ZenML server is not deployed.")
if service.config.server.name != server_name:
raise KeyError(
"The terraform ZenML server is deployed but with a different name."
)
return service
def _list_services(self) -> List[BaseService]:
"""Get all service instances for all deployed ZenML servers.
Returns:
A list of service instances.
"""
service = TerraformZenServer.get_service()
if service:
return [service]
return []
def _get_deployment_config(
self, service: BaseService
) -> ServerDeploymentConfig:
"""Recreate the server deployment configuration from a service instance.
Args:
service: The service instance.
Returns:
The server deployment configuration.
"""
server = cast(TerraformZenServer, service)
return server.config.server
def _get_deployment_status(
self, service: BaseService
) -> ServerDeploymentStatus:
"""Get the status of a server deployment from its service.
Args:
service: The server deployment service.
Returns:
The status of the server deployment.
"""
gc = GlobalConfiguration()
url: Optional[str] = None
service = cast(TerraformZenServer, service)
ca_crt = None
if service.is_running:
url = service.get_server_url()
ca_crt = service.get_certificate()
connected = (
url is not None and gc.store is not None and gc.store.url == url
)
return ServerDeploymentStatus(
url=url,
status=service.status.state,
status_message=service.status.last_error,
connected=connected,
ca_crt=ca_crt,
)
CONFIG_TYPE (ServerDeploymentConfig)
pydantic-model
Terraform server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
log_level |
str |
The log level to set the terraform client to. Choose one of TRACE, DEBUG, INFO, WARN or ERROR (case insensitive). |
username |
str |
The username for the default ZenML server account. |
password |
str |
The password for the default ZenML server account. |
helm_chart |
str |
The path to the ZenML server helm chart to use for deployment. |
zenmlserver_image_tag |
str |
The tag to use for the zenml server docker image. |
namespace |
str |
The Kubernetes namespace to deploy the ZenML server to. |
kubectl_config_path |
str |
The path to the kubectl config file to use for deployment. |
ingress_tls |
bool |
Whether to use TLS for the ingress. |
ingress_tls_generate_certs |
bool |
Whether to generate self-signed TLS certificates for the ingress. |
ingress_tls_secret_name |
str |
The name of the Kubernetes secret to use for the ingress. |
ingress_path |
str |
The path to use for the ingress. |
create_ingress_controller |
bool |
Whether to deploy an nginx ingress controller as part of the deployment. |
ingress_controller_hostname |
str |
The ingress controller hostname to use for the ingress self-signed certificate and to compute the ZenML server URL. |
deploy_db |
bool |
Whether to create a SQL database service as part of the recipe. |
database_username |
str |
The username for the database. |
database_password |
str |
The password for the database. |
database_url |
str |
The URL of the RDS instance to use for the ZenML server. |
database_ssl_ca |
str |
The path to the SSL CA certificate to use for the database connection. |
database_ssl_cert |
str |
The path to the client SSL certificate to use for the database connection. |
database_ssl_key |
str |
The path to the client SSL key to use for the database connection. |
database_ssl_verify_server_cert |
bool |
Whether to verify the database server SSL certificate. |
Source code in zenml/zen_server/deploy/terraform/providers/terraform_provider.py
class TerraformServerDeploymentConfig(ServerDeploymentConfig):
"""Terraform server deployment configuration.
Attributes:
log_level: The log level to set the terraform client to. Choose one of
TRACE, DEBUG, INFO, WARN or ERROR (case insensitive).
username: The username for the default ZenML server account.
password: The password for the default ZenML server account.
helm_chart: The path to the ZenML server helm chart to use for
deployment.
zenmlserver_image_tag: The tag to use for the zenml server docker
image.
namespace: The Kubernetes namespace to deploy the ZenML server to.
kubectl_config_path: The path to the kubectl config file to use for
deployment.
ingress_tls: Whether to use TLS for the ingress.
ingress_tls_generate_certs: Whether to generate self-signed TLS
certificates for the ingress.
ingress_tls_secret_name: The name of the Kubernetes secret to use for
the ingress.
ingress_path: The path to use for the ingress.
create_ingress_controller: Whether to deploy an nginx ingress
controller as part of the deployment.
ingress_controller_hostname: The ingress controller hostname to use for
the ingress self-signed certificate and to compute the ZenML server
URL.
deploy_db: Whether to create a SQL database service as part of the recipe.
database_username: The username for the database.
database_password: The password for the database.
database_url: The URL of the RDS instance to use for the ZenML server.
database_ssl_ca: The path to the SSL CA certificate to use for the
database connection.
database_ssl_cert: The path to the client SSL certificate to use for the
database connection.
database_ssl_key: The path to the client SSL key to use for the
database connection.
database_ssl_verify_server_cert: Whether to verify the database server
SSL certificate.
"""
log_level: str = "ERROR"
server_id: UUID = Field(default_factory=uuid4)
username: str
password: str
helm_chart: str = get_helm_chart_path()
zenmlserver_image_tag: str = "latest"
zenmlinit_image_tag: str = "latest"
namespace: str = "zenmlserver"
kubectl_config_path: str = os.path.join(str(Path.home()), ".kube", "config")
ingress_tls: bool = True
ingress_tls_generate_certs: bool = True
ingress_tls_secret_name: str = "zenml-tls-certs"
ingress_path: str = ""
create_ingress_controller: bool = True
ingress_controller_hostname: str = ""
deploy_db: bool = True
database_username: str = "user"
database_password: str = ""
database_url: str = ""
database_ssl_ca: str = ""
database_ssl_cert: str = ""
database_ssl_key: str = ""
database_ssl_verify_server_cert: bool = True
class Config:
"""Pydantic configuration."""
extra = "allow"
Config
Pydantic configuration.
Source code in zenml/zen_server/deploy/terraform/providers/terraform_provider.py
class Config:
"""Pydantic configuration."""
extra = "allow"
terraform_zen_server
Service implementation for the ZenML terraform server deployment.
TerraformServerDeploymentConfig (ServerDeploymentConfig)
pydantic-model
Terraform server deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
log_level |
str |
The log level to set the terraform client to. Choose one of TRACE, DEBUG, INFO, WARN or ERROR (case insensitive). |
username |
str |
The username for the default ZenML server account. |
password |
str |
The password for the default ZenML server account. |
helm_chart |
str |
The path to the ZenML server helm chart to use for deployment. |
zenmlserver_image_tag |
str |
The tag to use for the zenml server docker image. |
namespace |
str |
The Kubernetes namespace to deploy the ZenML server to. |
kubectl_config_path |
str |
The path to the kubectl config file to use for deployment. |
ingress_tls |
bool |
Whether to use TLS for the ingress. |
ingress_tls_generate_certs |
bool |
Whether to generate self-signed TLS certificates for the ingress. |
ingress_tls_secret_name |
str |
The name of the Kubernetes secret to use for the ingress. |
ingress_path |
str |
The path to use for the ingress. |
create_ingress_controller |
bool |
Whether to deploy an nginx ingress controller as part of the deployment. |
ingress_controller_hostname |
str |
The ingress controller hostname to use for the ingress self-signed certificate and to compute the ZenML server URL. |
deploy_db |
bool |
Whether to create a SQL database service as part of the recipe. |
database_username |
str |
The username for the database. |
database_password |
str |
The password for the database. |
database_url |
str |
The URL of the RDS instance to use for the ZenML server. |
database_ssl_ca |
str |
The path to the SSL CA certificate to use for the database connection. |
database_ssl_cert |
str |
The path to the client SSL certificate to use for the database connection. |
database_ssl_key |
str |
The path to the client SSL key to use for the database connection. |
database_ssl_verify_server_cert |
bool |
Whether to verify the database server SSL certificate. |
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
class TerraformServerDeploymentConfig(ServerDeploymentConfig):
"""Terraform server deployment configuration.
Attributes:
log_level: The log level to set the terraform client to. Choose one of
TRACE, DEBUG, INFO, WARN or ERROR (case insensitive).
username: The username for the default ZenML server account.
password: The password for the default ZenML server account.
helm_chart: The path to the ZenML server helm chart to use for
deployment.
zenmlserver_image_tag: The tag to use for the zenml server docker
image.
namespace: The Kubernetes namespace to deploy the ZenML server to.
kubectl_config_path: The path to the kubectl config file to use for
deployment.
ingress_tls: Whether to use TLS for the ingress.
ingress_tls_generate_certs: Whether to generate self-signed TLS
certificates for the ingress.
ingress_tls_secret_name: The name of the Kubernetes secret to use for
the ingress.
ingress_path: The path to use for the ingress.
create_ingress_controller: Whether to deploy an nginx ingress
controller as part of the deployment.
ingress_controller_hostname: The ingress controller hostname to use for
the ingress self-signed certificate and to compute the ZenML server
URL.
deploy_db: Whether to create a SQL database service as part of the recipe.
database_username: The username for the database.
database_password: The password for the database.
database_url: The URL of the RDS instance to use for the ZenML server.
database_ssl_ca: The path to the SSL CA certificate to use for the
database connection.
database_ssl_cert: The path to the client SSL certificate to use for the
database connection.
database_ssl_key: The path to the client SSL key to use for the
database connection.
database_ssl_verify_server_cert: Whether to verify the database server
SSL certificate.
"""
log_level: str = "ERROR"
server_id: UUID = Field(default_factory=uuid4)
username: str
password: str
helm_chart: str = get_helm_chart_path()
zenmlserver_image_tag: str = "latest"
zenmlinit_image_tag: str = "latest"
namespace: str = "zenmlserver"
kubectl_config_path: str = os.path.join(str(Path.home()), ".kube", "config")
ingress_tls: bool = True
ingress_tls_generate_certs: bool = True
ingress_tls_secret_name: str = "zenml-tls-certs"
ingress_path: str = ""
create_ingress_controller: bool = True
ingress_controller_hostname: str = ""
deploy_db: bool = True
database_username: str = "user"
database_password: str = ""
database_url: str = ""
database_ssl_ca: str = ""
database_ssl_cert: str = ""
database_ssl_key: str = ""
database_ssl_verify_server_cert: bool = True
class Config:
"""Pydantic configuration."""
extra = "allow"
Config
Pydantic configuration.
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
class Config:
"""Pydantic configuration."""
extra = "allow"
TerraformZenServer (TerraformService)
pydantic-model
Service that can be used to start a terraform ZenServer.
Attributes:
Name | Type | Description |
---|---|---|
config |
TerraformZenServerConfig |
service configuration |
endpoint |
Optional[zenml.services.service_endpoint.BaseServiceEndpoint] |
service endpoint |
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
class TerraformZenServer(TerraformService):
"""Service that can be used to start a terraform ZenServer.
Attributes:
config: service configuration
endpoint: service endpoint
"""
SERVICE_TYPE = ServiceType(
name="terraform_zenml_server",
type="zen_server",
flavor="terraform",
description="Terraform ZenML server deployment",
)
config: TerraformZenServerConfig
@classmethod
def get_service(cls) -> Optional["TerraformZenServer"]:
"""Load and return the terraform ZenML server service, if present.
Returns:
The terraform ZenML server service or None, if the terraform server
deployment is not found.
"""
from zenml.services import ServiceRegistry
try:
with open(TERRAFORM_ZENML_SERVER_CONFIG_FILENAME, "r") as f:
return cast(
TerraformZenServer,
ServiceRegistry().load_service_from_json(f.read()),
)
except FileNotFoundError:
return None
def get_vars(self) -> Dict[str, Any]:
"""Get variables as a dictionary.
Returns:
A dictionary of variables to use for the Terraform deployment.
"""
# get the contents of the server deployment config as dict
filter_vars = ["log_level", "provider"]
# filter keys that are not modeled as terraform deployment vars
vars = {
k: str(v) if isinstance(v, UUID) else v
for k, v in self.config.server.dict().items()
if k not in filter_vars
}
assert self.status.runtime_path
with open(
os.path.join(
self.status.runtime_path, self.config.variables_file_path
),
"w",
) as fp:
json.dump(vars, fp, indent=4)
return vars
def provision(self) -> None:
"""Provision the service."""
super().provision()
logger.info(
f"Your ZenML server is now deployed with URL:\n"
f"{self.get_server_url()}"
)
def get_server_url(self) -> str:
"""Returns the deployed ZenML server's URL.
Returns:
The URL of the deployed ZenML server.
"""
return str(
self.terraform_client.output(
TERRAFORM_DEPLOYED_ZENSERVER_OUTPUT_URL, full_value=True
)
)
def get_certificate(self) -> Optional[str]:
"""Returns the CA certificate configured for the ZenML server.
Returns:
The CA certificate configured for the ZenML server.
"""
return cast(
str,
self.terraform_client.output(
TERRAFORM_DEPLOYED_ZENSERVER_OUTPUT_CA_CRT, full_value=True
),
)
get_certificate(self)
Returns the CA certificate configured for the ZenML server.
Returns:
Type | Description |
---|---|
Optional[str] |
The CA certificate configured for the ZenML server. |
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def get_certificate(self) -> Optional[str]:
"""Returns the CA certificate configured for the ZenML server.
Returns:
The CA certificate configured for the ZenML server.
"""
return cast(
str,
self.terraform_client.output(
TERRAFORM_DEPLOYED_ZENSERVER_OUTPUT_CA_CRT, full_value=True
),
)
get_server_url(self)
Returns the deployed ZenML server's URL.
Returns:
Type | Description |
---|---|
str |
The URL of the deployed ZenML server. |
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def get_server_url(self) -> str:
"""Returns the deployed ZenML server's URL.
Returns:
The URL of the deployed ZenML server.
"""
return str(
self.terraform_client.output(
TERRAFORM_DEPLOYED_ZENSERVER_OUTPUT_URL, full_value=True
)
)
get_service()
classmethod
Load and return the terraform ZenML server service, if present.
Returns:
Type | Description |
---|---|
Optional[TerraformZenServer] |
The terraform ZenML server service or None, if the terraform server deployment is not found. |
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
@classmethod
def get_service(cls) -> Optional["TerraformZenServer"]:
"""Load and return the terraform ZenML server service, if present.
Returns:
The terraform ZenML server service or None, if the terraform server
deployment is not found.
"""
from zenml.services import ServiceRegistry
try:
with open(TERRAFORM_ZENML_SERVER_CONFIG_FILENAME, "r") as f:
return cast(
TerraformZenServer,
ServiceRegistry().load_service_from_json(f.read()),
)
except FileNotFoundError:
return None
get_vars(self)
Get variables as a dictionary.
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary of variables to use for the Terraform deployment. |
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def get_vars(self) -> Dict[str, Any]:
"""Get variables as a dictionary.
Returns:
A dictionary of variables to use for the Terraform deployment.
"""
# get the contents of the server deployment config as dict
filter_vars = ["log_level", "provider"]
# filter keys that are not modeled as terraform deployment vars
vars = {
k: str(v) if isinstance(v, UUID) else v
for k, v in self.config.server.dict().items()
if k not in filter_vars
}
assert self.status.runtime_path
with open(
os.path.join(
self.status.runtime_path, self.config.variables_file_path
),
"w",
) as fp:
json.dump(vars, fp, indent=4)
return vars
provision(self)
Provision the service.
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def provision(self) -> None:
"""Provision the service."""
super().provision()
logger.info(
f"Your ZenML server is now deployed with URL:\n"
f"{self.get_server_url()}"
)
TerraformZenServerConfig (TerraformServiceConfig)
pydantic-model
Terraform Zen server configuration.
Attributes:
Name | Type | Description |
---|---|---|
server |
TerraformServerDeploymentConfig |
The deployment configuration. |
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
class TerraformZenServerConfig(TerraformServiceConfig):
"""Terraform Zen server configuration.
Attributes:
server: The deployment configuration.
"""
server: TerraformServerDeploymentConfig
copy_terraform_files: bool = True
get_helm_chart_path()
Get the ZenML server helm chart path.
The ZenML server helm chart files are located in a folder relative to the
zenml.zen_server.deploy
Python module.
Returns:
Type | Description |
---|---|
str |
The helm chart path. |
Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def get_helm_chart_path() -> str:
"""Get the ZenML server helm chart path.
The ZenML server helm chart files are located in a folder relative to the
`zenml.zen_server.deploy` Python module.
Returns:
The helm chart path.
"""
import zenml.zen_server.deploy as deploy_module
path = os.path.join(
os.path.dirname(deploy_module.__file__),
ZENML_HELM_CHART_SUBPATH,
)
return path
models
special
ZenML Server API Models.
These models are only used as REST API representations of the domain models in the context of different operations, where different fields can be omitted or even renamed depending on the REST endpoint where they are used. These are separate from the domain models and should provide conversion logic where needed.
This separation allows the domain models and REST API to evolve independently of each other.
base_models
Base REST API model definitions.
CreateRequest (BaseModel, Generic)
pydantic-model
Base model used for create requests.
Source code in zenml/zen_server/models/base_models.py
class CreateRequest(BaseModel, Generic[AnyModel]):
"""Base model used for create requests."""
_MODEL_TYPE: Type[AnyModel]
def to_model(self, **kwargs: Any) -> AnyModel:
"""Create a domain model from this create request.
Args:
kwargs: Additional keyword arguments to pass to the model
Returns:
The created domain model.
"""
return self._MODEL_TYPE(**self.dict(exclude_none=True), **kwargs)
@classmethod
def from_model(
cls, model: AnyModel, **kwargs: Any
) -> "CreateRequest[AnyModel]":
"""Convert a domain model into a create request.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the create request.
Returns:
The create request.
"""
return cls(**model.dict(), **kwargs)
class Config:
"""Pydantic config."""
underscore_attrs_are_private = True
Config
Pydantic config.
Source code in zenml/zen_server/models/base_models.py
class Config:
"""Pydantic config."""
underscore_attrs_are_private = True
from_model(model, **kwargs)
classmethod
Convert a domain model into a create request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
~AnyModel |
The domain model to convert. |
required |
kwargs |
Any |
Additional keyword arguments to pass to the create request. |
{} |
Returns:
Type | Description |
---|---|
CreateRequest[AnyModel] |
The create request. |
Source code in zenml/zen_server/models/base_models.py
@classmethod
def from_model(
cls, model: AnyModel, **kwargs: Any
) -> "CreateRequest[AnyModel]":
"""Convert a domain model into a create request.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the create request.
Returns:
The create request.
"""
return cls(**model.dict(), **kwargs)
to_model(self, **kwargs)
Create a domain model from this create request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
Any |
Additional keyword arguments to pass to the model |
{} |
Returns:
Type | Description |
---|---|
~AnyModel |
The created domain model. |
Source code in zenml/zen_server/models/base_models.py
def to_model(self, **kwargs: Any) -> AnyModel:
"""Create a domain model from this create request.
Args:
kwargs: Additional keyword arguments to pass to the model
Returns:
The created domain model.
"""
return self._MODEL_TYPE(**self.dict(exclude_none=True), **kwargs)
CreateResponse (BaseModel, Generic)
pydantic-model
Base model used for create responses.
Source code in zenml/zen_server/models/base_models.py
class CreateResponse(BaseModel, Generic[AnyModel]):
"""Base model used for create responses."""
_MODEL_TYPE: Type[AnyModel]
@classmethod
def from_model(
cls, model: AnyModel, **kwargs: Any
) -> "CreateResponse[AnyModel]":
"""Convert a domain model into a create response.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the create response.
Returns:
The create response.
"""
return cls(**model.dict(), **kwargs)
def to_model(self, **kwargs: Any) -> AnyModel:
"""Create a domain model from this create response.
Args:
kwargs: Additional keyword arguments to pass to the model
Returns:
The created domain model.
"""
return self._MODEL_TYPE(**self.dict(exclude_none=True), **kwargs)
class Config:
"""Pydantic config."""
underscore_attrs_are_private = True
Config
Pydantic config.
Source code in zenml/zen_server/models/base_models.py
class Config:
"""Pydantic config."""
underscore_attrs_are_private = True
from_model(model, **kwargs)
classmethod
Convert a domain model into a create response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
~AnyModel |
The domain model to convert. |
required |
kwargs |
Any |
Additional keyword arguments to pass to the create response. |
{} |
Returns:
Type | Description |
---|---|
CreateResponse[AnyModel] |
The create response. |
Source code in zenml/zen_server/models/base_models.py
@classmethod
def from_model(
cls, model: AnyModel, **kwargs: Any
) -> "CreateResponse[AnyModel]":
"""Convert a domain model into a create response.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the create response.
Returns:
The create response.
"""
return cls(**model.dict(), **kwargs)
to_model(self, **kwargs)
Create a domain model from this create response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
Any |
Additional keyword arguments to pass to the model |
{} |
Returns:
Type | Description |
---|---|
~AnyModel |
The created domain model. |
Source code in zenml/zen_server/models/base_models.py
def to_model(self, **kwargs: Any) -> AnyModel:
"""Create a domain model from this create response.
Args:
kwargs: Additional keyword arguments to pass to the model
Returns:
The created domain model.
"""
return self._MODEL_TYPE(**self.dict(exclude_none=True), **kwargs)
ProjectScopedCreateRequest (CreateRequest)
pydantic-model
Base model used for project scoped create requests.
Source code in zenml/zen_server/models/base_models.py
class ProjectScopedCreateRequest(CreateRequest[AnyModel]):
"""Base model used for project scoped create requests."""
def to_model(self, project: UUID, user: UUID, **kwargs: Any) -> AnyModel: # type: ignore[override]
"""Create a domain model from this create request.
Args:
project: The project to create the model in.
user: The user creating the model.
kwargs: Additional keyword arguments to pass to the model
Returns:
The created domain model.
"""
return super().to_model(project=project, user=user, **kwargs)
to_model(self, project, user, **kwargs)
Create a domain model from this create request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project |
UUID |
The project to create the model in. |
required |
user |
UUID |
The user creating the model. |
required |
kwargs |
Any |
Additional keyword arguments to pass to the model |
{} |
Returns:
Type | Description |
---|---|
~AnyModel |
The created domain model. |
Source code in zenml/zen_server/models/base_models.py
def to_model(self, project: UUID, user: UUID, **kwargs: Any) -> AnyModel: # type: ignore[override]
"""Create a domain model from this create request.
Args:
project: The project to create the model in.
user: The user creating the model.
kwargs: Additional keyword arguments to pass to the model
Returns:
The created domain model.
"""
return super().to_model(project=project, user=user, **kwargs)
UpdateRequest (BaseModel, Generic)
pydantic-model
Base model used for update requests.
Source code in zenml/zen_server/models/base_models.py
class UpdateRequest(BaseModel, Generic[AnyModel]):
"""Base model used for update requests."""
_MODEL_TYPE: Type[AnyModel]
def apply_to_model(self, model: AnyModel) -> AnyModel:
"""Apply the update changes to a domain model.
Args:
model: The domain model to update.
Returns:
The updated domain model.
"""
for k, v in self.dict(exclude_none=True).items():
setattr(model, k, v)
return model
@classmethod
def from_model(
cls, model: AnyModel, **kwargs: Any
) -> "UpdateRequest[AnyModel]":
"""Convert a domain model into a update request.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the update request.
Returns:
The create request.
"""
return cls(**model.dict(), **kwargs)
class Config:
"""Pydantic config."""
underscore_attrs_are_private = True
Config
Pydantic config.
Source code in zenml/zen_server/models/base_models.py
class Config:
"""Pydantic config."""
underscore_attrs_are_private = True
apply_to_model(self, model)
Apply the update changes to a domain model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
~AnyModel |
The domain model to update. |
required |
Returns:
Type | Description |
---|---|
~AnyModel |
The updated domain model. |
Source code in zenml/zen_server/models/base_models.py
def apply_to_model(self, model: AnyModel) -> AnyModel:
"""Apply the update changes to a domain model.
Args:
model: The domain model to update.
Returns:
The updated domain model.
"""
for k, v in self.dict(exclude_none=True).items():
setattr(model, k, v)
return model
from_model(model, **kwargs)
classmethod
Convert a domain model into a update request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
~AnyModel |
The domain model to convert. |
required |
kwargs |
Any |
Additional keyword arguments to pass to the update request. |
{} |
Returns:
Type | Description |
---|---|
UpdateRequest[AnyModel] |
The create request. |
Source code in zenml/zen_server/models/base_models.py
@classmethod
def from_model(
cls, model: AnyModel, **kwargs: Any
) -> "UpdateRequest[AnyModel]":
"""Convert a domain model into a update request.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the update request.
Returns:
The create request.
"""
return cls(**model.dict(), **kwargs)
UpdateResponse (BaseModel, Generic)
pydantic-model
Base model used for update responses.
Source code in zenml/zen_server/models/base_models.py
class UpdateResponse(BaseModel, Generic[AnyModel]):
"""Base model used for update responses."""
_MODEL_TYPE: Type[AnyModel]
@classmethod
def from_model(
cls, model: AnyModel, **kwargs: Any
) -> "UpdateResponse[AnyModel]":
"""Convert a domain model into an update response.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the update response.
Returns:
The update response.
"""
return cls(**model.dict(), **kwargs)
def to_model(self, **kwargs: Any) -> AnyModel:
"""Create a domain model from this update response.
Args:
kwargs: Additional keyword arguments to pass to the model
Returns:
The created domain model.
"""
return self._MODEL_TYPE(**self.dict(exclude_none=True), **kwargs)
class Config:
"""Pydantic config."""
underscore_attrs_are_private = True
Config
Pydantic config.
Source code in zenml/zen_server/models/base_models.py
class Config:
"""Pydantic config."""
underscore_attrs_are_private = True
from_model(model, **kwargs)
classmethod
Convert a domain model into an update response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
~AnyModel |
The domain model to convert. |
required |
kwargs |
Any |
Additional keyword arguments to pass to the update response. |
{} |
Returns:
Type | Description |
---|---|
UpdateResponse[AnyModel] |
The update response. |
Source code in zenml/zen_server/models/base_models.py
@classmethod
def from_model(
cls, model: AnyModel, **kwargs: Any
) -> "UpdateResponse[AnyModel]":
"""Convert a domain model into an update response.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the update response.
Returns:
The update response.
"""
return cls(**model.dict(), **kwargs)
to_model(self, **kwargs)
Create a domain model from this update response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
Any |
Additional keyword arguments to pass to the model |
{} |
Returns:
Type | Description |
---|---|
~AnyModel |
The created domain model. |
Source code in zenml/zen_server/models/base_models.py
def to_model(self, **kwargs: Any) -> AnyModel:
"""Create a domain model from this update response.
Args:
kwargs: Additional keyword arguments to pass to the model
Returns:
The created domain model.
"""
return self._MODEL_TYPE(**self.dict(exclude_none=True), **kwargs)
component_models
Stack Models for the API endpoint definitions.
CreateComponentModel (BaseModel)
pydantic-model
Model used for all update operations on stacks.
Source code in zenml/zen_server/models/component_models.py
class CreateComponentModel(BaseModel):
"""Model used for all update operations on stacks."""
name: str = Field(
title="The name of the Stack Component.",
)
type: StackComponentType = Field(
title="The type of the Stack Component.",
)
flavor: Optional[str] = Field(
title="The flavor of the Stack Component.",
)
configuration: Dict[
str, Any
] = Field( # Json representation of the configuration
title="The id of the Stack Component.",
)
is_shared: bool = Field(
default=False,
title="Flag describing if this component is shared.",
)
def to_model(self, project: UUID, user: UUID) -> "ComponentModel":
"""Applies user defined changes to this model.
Args:
project: Project context of the stack.
user: User context of the stack
Returns:
The updated model.
"""
return ComponentModel(project=project, user=user, **self.dict())
to_model(self, project, user)
Applies user defined changes to this model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project |
UUID |
Project context of the stack. |
required |
user |
UUID |
User context of the stack |
required |
Returns:
Type | Description |
---|---|
ComponentModel |
The updated model. |
Source code in zenml/zen_server/models/component_models.py
def to_model(self, project: UUID, user: UUID) -> "ComponentModel":
"""Applies user defined changes to this model.
Args:
project: Project context of the stack.
user: User context of the stack
Returns:
The updated model.
"""
return ComponentModel(project=project, user=user, **self.dict())
UpdateComponentModel (BaseModel)
pydantic-model
Model used for all update operations on stacks.
Source code in zenml/zen_server/models/component_models.py
class UpdateComponentModel(BaseModel):
"""Model used for all update operations on stacks."""
name: Optional[str] = Field(
title="The name of the Stack Component.",
)
type: Optional[StackComponentType] = Field(
title="The type of the Stack Component.",
)
flavor: Optional[str] = Field(
title="The flavor of the Stack Component.",
)
configuration: Optional[Dict[str, Any]] = Field(
title="The id of the Stack Component.",
) # Json representation of the configuration
is_shared: Optional[bool] = Field(
default=False,
title="Flag describing if this component is shared.",
)
def apply_to_model(self, stack: "ComponentModel") -> "ComponentModel":
"""Applies user defined changes to this model.
Args:
stack: Component model the changes will be applied to
Returns:
The updated component model
"""
for key, value in self.dict().items():
if value is not None:
setattr(stack, key, value)
return stack
apply_to_model(self, stack)
Applies user defined changes to this model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
ComponentModel |
Component model the changes will be applied to |
required |
Returns:
Type | Description |
---|---|
ComponentModel |
The updated component model |
Source code in zenml/zen_server/models/component_models.py
def apply_to_model(self, stack: "ComponentModel") -> "ComponentModel":
"""Applies user defined changes to this model.
Args:
stack: Component model the changes will be applied to
Returns:
The updated component model
"""
for key, value in self.dict().items():
if value is not None:
setattr(stack, key, value)
return stack
pipeline_models
Project Models for the API endpoint definitions.
CreatePipelineRequest (ProjectScopedCreateRequest)
pydantic-model
Pipeline model for create requests.
Source code in zenml/zen_server/models/pipeline_models.py
class CreatePipelineRequest(ProjectScopedCreateRequest[PipelineModel]):
"""Pipeline model for create requests."""
_MODEL_TYPE = PipelineModel
name: str = Field(
title="The name of the pipeline.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
docstring: Optional[str]
spec: PipelineSpec
CreatePipelineRunRequest (ProjectScopedCreateRequest)
pydantic-model
Pipeline run model for create requests.
Source code in zenml/zen_server/models/pipeline_models.py
class CreatePipelineRunRequest(ProjectScopedCreateRequest[PipelineRunModel]):
"""Pipeline run model for create requests."""
_MODEL_TYPE = PipelineRunModel
id: Optional[UUID]
name: str = Field(
title="The name of the pipeline run.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
orchestrator_run_id: Optional[str]
stack_id: Optional[UUID]
pipeline_id: Optional[UUID]
status: ExecutionStatus
pipeline_configuration: Dict[str, Any]
num_steps: int
mlmd_id: Optional[int]
HydratedPipelineModel (PipelineModel)
pydantic-model
Pipeline model with User and Project fully hydrated.
Source code in zenml/zen_server/models/pipeline_models.py
class HydratedPipelineModel(PipelineModel):
"""Pipeline model with User and Project fully hydrated."""
runs: List["PipelineRunModel"] = Field(
title="A list of the last x Pipeline Runs."
)
status: List[ExecutionStatus] = Field(
title="The status of the last x Pipeline Runs."
)
project: ProjectModel = Field( # type: ignore[assignment]
title="The project that contains this pipeline."
)
user: UserModel = Field( # type: ignore[assignment]
title="The user that created this pipeline.",
)
@classmethod
def from_model(
cls, pipeline_model: PipelineModel, num_runs: int = 3
) -> "HydratedPipelineModel":
"""Converts this model to a hydrated model.
Args:
pipeline_model: The pipeline model to hydrate.
num_runs: The number of runs to include.
Returns:
A hydrated model.
"""
zen_store = GlobalConfiguration().zen_store
project = zen_store.get_project(pipeline_model.project)
user = zen_store.get_user(pipeline_model.user)
runs = zen_store.list_runs(pipeline_id=pipeline_model.id)
last_x_runs = runs[-num_runs:]
status_last_x_runs = [run.status for run in last_x_runs]
return cls(
id=pipeline_model.id,
name=pipeline_model.name,
project=project,
user=user,
runs=last_x_runs,
status=status_last_x_runs,
docstring=pipeline_model.docstring,
spec=pipeline_model.spec,
created=pipeline_model.created,
updated=pipeline_model.updated,
)
from_model(pipeline_model, num_runs=3)
classmethod
Converts this model to a hydrated model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_model |
PipelineModel |
The pipeline model to hydrate. |
required |
num_runs |
int |
The number of runs to include. |
3 |
Returns:
Type | Description |
---|---|
HydratedPipelineModel |
A hydrated model. |
Source code in zenml/zen_server/models/pipeline_models.py
@classmethod
def from_model(
cls, pipeline_model: PipelineModel, num_runs: int = 3
) -> "HydratedPipelineModel":
"""Converts this model to a hydrated model.
Args:
pipeline_model: The pipeline model to hydrate.
num_runs: The number of runs to include.
Returns:
A hydrated model.
"""
zen_store = GlobalConfiguration().zen_store
project = zen_store.get_project(pipeline_model.project)
user = zen_store.get_user(pipeline_model.user)
runs = zen_store.list_runs(pipeline_id=pipeline_model.id)
last_x_runs = runs[-num_runs:]
status_last_x_runs = [run.status for run in last_x_runs]
return cls(
id=pipeline_model.id,
name=pipeline_model.name,
project=project,
user=user,
runs=last_x_runs,
status=status_last_x_runs,
docstring=pipeline_model.docstring,
spec=pipeline_model.spec,
created=pipeline_model.created,
updated=pipeline_model.updated,
)
HydratedPipelineRunModel (PipelineRunModel)
pydantic-model
Pipeline model with User and Project fully hydrated.
Source code in zenml/zen_server/models/pipeline_models.py
class HydratedPipelineRunModel(PipelineRunModel):
"""Pipeline model with User and Project fully hydrated."""
pipeline: Optional[PipelineModel] = Field(
title="The pipeline this run belongs to."
)
stack: Optional[StackModel] = Field(
title="The stack that was used for this run."
)
user: UserModel = Field( # type: ignore[assignment]
title="The user that ran this pipeline.",
)
status: ExecutionStatus = Field(title="The status of the run.")
@classmethod
def from_model(
cls,
run_model: PipelineRunModel,
) -> "HydratedPipelineRunModel":
"""Converts this model to a hydrated model.
Args:
run_model: The run model to hydrate.
Returns:
A hydrated model.
"""
zen_store = GlobalConfiguration().zen_store
pipeline = None
stack = None
user = None
if run_model.pipeline_id:
pipeline = zen_store.get_pipeline(run_model.pipeline_id)
if run_model.stack_id:
stack = zen_store.get_stack(run_model.stack_id)
if run_model.user:
user = zen_store.get_user(run_model.user)
return cls(
**run_model.dict(exclude={"user", "pipeline", "stack"}),
pipeline=pipeline,
stack=stack,
user=user,
)
from_model(run_model)
classmethod
Converts this model to a hydrated model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_model |
PipelineRunModel |
The run model to hydrate. |
required |
Returns:
Type | Description |
---|---|
HydratedPipelineRunModel |
A hydrated model. |
Source code in zenml/zen_server/models/pipeline_models.py
@classmethod
def from_model(
cls,
run_model: PipelineRunModel,
) -> "HydratedPipelineRunModel":
"""Converts this model to a hydrated model.
Args:
run_model: The run model to hydrate.
Returns:
A hydrated model.
"""
zen_store = GlobalConfiguration().zen_store
pipeline = None
stack = None
user = None
if run_model.pipeline_id:
pipeline = zen_store.get_pipeline(run_model.pipeline_id)
if run_model.stack_id:
stack = zen_store.get_stack(run_model.stack_id)
if run_model.user:
user = zen_store.get_user(run_model.user)
return cls(
**run_model.dict(exclude={"user", "pipeline", "stack"}),
pipeline=pipeline,
stack=stack,
user=user,
)
UpdatePipelineRequest (UpdateRequest)
pydantic-model
Pipeline model for update requests.
Source code in zenml/zen_server/models/pipeline_models.py
class UpdatePipelineRequest(UpdateRequest[PipelineModel]):
"""Pipeline model for update requests."""
_MODEL_TYPE = PipelineModel
name: Optional[str] = Field(
title="The name of the pipeline.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
docstring: Optional[str]
spec: PipelineSpec
projects_models
Project Models for the API endpoint definitions.
CreateProjectRequest (CreateRequest)
pydantic-model
Project model for create requests.
Source code in zenml/zen_server/models/projects_models.py
class CreateProjectRequest(CreateRequest[ProjectModel]):
"""Project model for create requests."""
_MODEL_TYPE = ProjectModel
name: str = Field(
title="The unique name of the project.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
description: Optional[str] = Field(
default=None,
title="The description of the project.",
max_length=MODEL_DESCRIPTIVE_FIELD_MAX_LENGTH,
)
UpdateProjectRequest (UpdateRequest)
pydantic-model
Project model for update requests.
Source code in zenml/zen_server/models/projects_models.py
class UpdateProjectRequest(UpdateRequest[ProjectModel]):
"""Project model for update requests."""
_MODEL_TYPE = ProjectModel
name: Optional[str] = Field(
default=None,
title="The new name of the project.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
description: Optional[str] = Field(
default=None,
title="The new description of the project.",
max_length=MODEL_DESCRIPTIVE_FIELD_MAX_LENGTH,
)
stack_models
Stack Models for the API endpoint definitions.
CreateStackRequest (ProjectScopedCreateRequest)
pydantic-model
Stack model for create requests.
Source code in zenml/zen_server/models/stack_models.py
class CreateStackRequest(ProjectScopedCreateRequest[StackModel]):
"""Stack model for create requests."""
_MODEL_TYPE = StackModel
name: str = Field(
title="The stack name.", max_length=MODEL_NAME_FIELD_MAX_LENGTH
)
description: Optional[str] = Field(
default=None,
title="The description of the stack",
max_length=MODEL_DESCRIPTIVE_FIELD_MAX_LENGTH,
)
components: Dict[StackComponentType, List[UUID]] = Field(
default=None,
title=(
"A mapping of stack component types to the id's of"
"instances of components of this type."
),
)
is_shared: bool = Field(
default=False,
title="Flag describing if this stack is shared.",
)
UpdateStackRequest (UpdateRequest)
pydantic-model
Stack model for update requests.
Source code in zenml/zen_server/models/stack_models.py
class UpdateStackRequest(UpdateRequest[StackModel]):
"""Stack model for update requests."""
_MODEL_TYPE = StackModel
name: Optional[str] = Field(
default=None,
title="The stack name.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
description: Optional[str] = Field(
default=None,
title="The updated description of the stack",
max_length=300,
)
components: Optional[Dict[StackComponentType, List[UUID]]] = Field(
default=None,
title=(
"An updated mapping of stack component types to the id's of"
"instances of components of this type."
),
)
is_shared: Optional[bool] = Field(
default=None,
title="Updated flag describing if this stack is shared.",
)
user_management_models
REST API user management models implementation.
ActivateUserRequest (UpdateRequest)
pydantic-model
Model for user activation requests.
Source code in zenml/zen_server/models/user_management_models.py
class ActivateUserRequest(UpdateRequest[UserModel]):
"""Model for user activation requests."""
_MODEL_TYPE = UserModel
name: Optional[str] = Field(
default=None,
title="Unique username for the account.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
full_name: Optional[str] = Field(
default=None,
title="Full name for the account owner.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
password: SecretStr = Field(
title="Account password.", max_length=USER_PASSWORD_MAX_LENGTH
)
activation_token: str = Field(
title="Account activation token.",
min_length=USER_ACTIVATION_TOKEN_LENGTH,
max_length=USER_ACTIVATION_TOKEN_LENGTH,
)
def apply_to_model(self, model: UserModel) -> UserModel:
"""Apply the update changes to a user domain model.
Args:
model: The user domain model to update.
Returns:
The updated user domain model.
"""
for k, v in self.dict(exclude_none=True).items():
if k in ["activation_token", "password"]:
continue
setattr(model, k, v)
model.password = self.password
# skip the activation token intentionally, because it is validated
# separately
return model
apply_to_model(self, model)
Apply the update changes to a user domain model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
UserModel |
The user domain model to update. |
required |
Returns:
Type | Description |
---|---|
UserModel |
The updated user domain model. |
Source code in zenml/zen_server/models/user_management_models.py
def apply_to_model(self, model: UserModel) -> UserModel:
"""Apply the update changes to a user domain model.
Args:
model: The user domain model to update.
Returns:
The updated user domain model.
"""
for k, v in self.dict(exclude_none=True).items():
if k in ["activation_token", "password"]:
continue
setattr(model, k, v)
model.password = self.password
# skip the activation token intentionally, because it is validated
# separately
return model
CreateRoleRequest (CreateRequest)
pydantic-model
Model for role creation requests.
Source code in zenml/zen_server/models/user_management_models.py
class CreateRoleRequest(CreateRequest[RoleModel]):
"""Model for role creation requests."""
_MODEL_TYPE = RoleModel
name: str = Field(
title="The unique name of the role.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
permissions: Set[str]
CreateTeamRequest (CreateRequest)
pydantic-model
Model for team creation requests.
Source code in zenml/zen_server/models/user_management_models.py
class CreateTeamRequest(CreateRequest[TeamModel]):
"""Model for team creation requests."""
_MODEL_TYPE = TeamModel
name: str = Field(
title="The unique name of the team.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
CreateUserRequest (CreateRequest)
pydantic-model
Model for user creation requests.
Source code in zenml/zen_server/models/user_management_models.py
class CreateUserRequest(CreateRequest[UserModel]):
"""Model for user creation requests."""
_MODEL_TYPE = UserModel
name: str = Field(
title="The unique username for the account.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
full_name: Optional[str] = Field(
default=None,
title="The full name for the account owner.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
password: Optional[str] = Field(
default=None,
title="Account password.",
max_length=USER_PASSWORD_MAX_LENGTH,
)
@classmethod
def from_model(cls, model: UserModel, **kwargs: Any) -> "CreateUserRequest":
"""Convert a user domain model into a user create request.
Args:
model: The user domain model to convert.
kwargs: Additional keyword arguments to pass to the user create
request.
Returns:
The user create request.
"""
return cast(
CreateUserRequest,
super().from_model(model, **kwargs, password=model.get_password()),
)
from_model(model, **kwargs)
classmethod
Convert a user domain model into a user create request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
UserModel |
The user domain model to convert. |
required |
kwargs |
Any |
Additional keyword arguments to pass to the user create request. |
{} |
Returns:
Type | Description |
---|---|
CreateUserRequest |
The user create request. |
Source code in zenml/zen_server/models/user_management_models.py
@classmethod
def from_model(cls, model: UserModel, **kwargs: Any) -> "CreateUserRequest":
"""Convert a user domain model into a user create request.
Args:
model: The user domain model to convert.
kwargs: Additional keyword arguments to pass to the user create
request.
Returns:
The user create request.
"""
return cast(
CreateUserRequest,
super().from_model(model, **kwargs, password=model.get_password()),
)
CreateUserResponse (UserModel, CreateResponse)
pydantic-model
Model for user creation responses.
Source code in zenml/zen_server/models/user_management_models.py
class CreateUserResponse(UserModel, CreateResponse[UserModel]):
"""Model for user creation responses."""
_MODEL_TYPE = UserModel
activation_token: Optional[str] = Field( # type: ignore[assignment]
default=None, title="Account activation token."
)
@classmethod
def from_model(
cls, model: UserModel, **kwargs: Any
) -> "CreateUserResponse":
"""Convert a user domain model into a user create response.
Args:
model: The user domain model to convert.
kwargs: Additional keyword arguments to pass to the user create
response.
Returns:
The user create response.
"""
return cast(
CreateUserResponse,
super().from_model(
model, **kwargs, activation_token=model.get_activation_token()
),
)
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them
validate_assignment = True
underscore_attrs_are_private = True
Config
Pydantic configuration class.
Source code in zenml/zen_server/models/user_management_models.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them
validate_assignment = True
underscore_attrs_are_private = True
from_model(model, **kwargs)
classmethod
Convert a user domain model into a user create response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
UserModel |
The user domain model to convert. |
required |
kwargs |
Any |
Additional keyword arguments to pass to the user create response. |
{} |
Returns:
Type | Description |
---|---|
CreateUserResponse |
The user create response. |
Source code in zenml/zen_server/models/user_management_models.py
@classmethod
def from_model(
cls, model: UserModel, **kwargs: Any
) -> "CreateUserResponse":
"""Convert a user domain model into a user create response.
Args:
model: The user domain model to convert.
kwargs: Additional keyword arguments to pass to the user create
response.
Returns:
The user create response.
"""
return cast(
CreateUserResponse,
super().from_model(
model, **kwargs, activation_token=model.get_activation_token()
),
)
DeactivateUserResponse (UserModel, UpdateResponse)
pydantic-model
Model for user deactivation requests.
Source code in zenml/zen_server/models/user_management_models.py
class DeactivateUserResponse(UserModel, UpdateResponse[UserModel]):
"""Model for user deactivation requests."""
_MODEL_TYPE = UserModel
activation_token: str = Field(..., title="Account activation token.") # type: ignore[assignment]
@classmethod
def from_model(
cls, model: UserModel, **kwargs: Any
) -> "DeactivateUserResponse":
"""Convert a domain model into a user deactivation response.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the user
deactivation response.
Returns:
The user deactivation response.
"""
return cast(
DeactivateUserResponse,
super().from_model(
model, **kwargs, activation_token=model.get_activation_token()
),
)
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them
validate_assignment = True
underscore_attrs_are_private = True
Config
Pydantic configuration class.
Source code in zenml/zen_server/models/user_management_models.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them
validate_assignment = True
underscore_attrs_are_private = True
from_model(model, **kwargs)
classmethod
Convert a domain model into a user deactivation response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
UserModel |
The domain model to convert. |
required |
kwargs |
Any |
Additional keyword arguments to pass to the user deactivation response. |
{} |
Returns:
Type | Description |
---|---|
DeactivateUserResponse |
The user deactivation response. |
Source code in zenml/zen_server/models/user_management_models.py
@classmethod
def from_model(
cls, model: UserModel, **kwargs: Any
) -> "DeactivateUserResponse":
"""Convert a domain model into a user deactivation response.
Args:
model: The domain model to convert.
kwargs: Additional keyword arguments to pass to the user
deactivation response.
Returns:
The user deactivation response.
"""
return cast(
DeactivateUserResponse,
super().from_model(
model, **kwargs, activation_token=model.get_activation_token()
),
)
EmailOptInModel (BaseModel)
pydantic-model
Model for user deactivation requests.
Source code in zenml/zen_server/models/user_management_models.py
class EmailOptInModel(BaseModel):
"""Model for user deactivation requests."""
email: Optional[str] = Field(
default=None,
title="Email address associated with the account.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
email_opted_in: bool = Field(
title="Whether or not to associate the email with the user"
)
UpdateRoleRequest (UpdateRequest)
pydantic-model
Model for role update requests.
Source code in zenml/zen_server/models/user_management_models.py
class UpdateRoleRequest(UpdateRequest[RoleModel]):
"""Model for role update requests."""
_MODEL_TYPE = RoleModel
name: Optional[str] = Field(
default=None,
title="Updated role name.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
permissions: Set[str]
UpdateTeamRequest (UpdateRequest)
pydantic-model
Model for team update requests.
Source code in zenml/zen_server/models/user_management_models.py
class UpdateTeamRequest(UpdateRequest[TeamModel]):
"""Model for team update requests."""
_MODEL_TYPE = TeamModel
name: Optional[str] = Field(
default=None,
title="Updated team name.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
UpdateUserRequest (UpdateRequest)
pydantic-model
Model for user update requests.
Source code in zenml/zen_server/models/user_management_models.py
class UpdateUserRequest(UpdateRequest[UserModel]):
"""Model for user update requests."""
_MODEL_TYPE = UserModel
name: Optional[str] = Field(
default=None,
title="Updated username for the account.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
full_name: Optional[str] = Field(
default=None,
title="Updated full name for the account owner.",
max_length=MODEL_NAME_FIELD_MAX_LENGTH,
)
password: Optional[SecretStr] = Field(
default=None,
title="Updated account password.",
max_length=USER_PASSWORD_MAX_LENGTH,
)
def apply_to_model(self, model: UserModel) -> UserModel:
"""Apply the update changes to a user domain model.
Args:
model: The user domain model to update.
Returns:
The updated user domain model.
"""
user = super().apply_to_model(model)
if self.password is not None:
user.password = self.password
return user
@classmethod
def from_model(cls, model: UserModel, **kwargs: Any) -> "UpdateUserRequest":
"""Convert a user domain model into an update request.
Args:
model: The user domain model to convert.
kwargs: Additional keyword arguments to pass to the user update
response.
Returns:
The update request.
"""
return cast(
UpdateUserRequest,
super().from_model(model, **kwargs, password=model.get_password()),
)
apply_to_model(self, model)
Apply the update changes to a user domain model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
UserModel |
The user domain model to update. |
required |
Returns:
Type | Description |
---|---|
UserModel |
The updated user domain model. |
Source code in zenml/zen_server/models/user_management_models.py
def apply_to_model(self, model: UserModel) -> UserModel:
"""Apply the update changes to a user domain model.
Args:
model: The user domain model to update.
Returns:
The updated user domain model.
"""
user = super().apply_to_model(model)
if self.password is not None:
user.password = self.password
return user
from_model(model, **kwargs)
classmethod
Convert a user domain model into an update request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model |
UserModel |
The user domain model to convert. |
required |
kwargs |
Any |
Additional keyword arguments to pass to the user update response. |
{} |
Returns:
Type | Description |
---|---|
UpdateUserRequest |
The update request. |
Source code in zenml/zen_server/models/user_management_models.py
@classmethod
def from_model(cls, model: UserModel, **kwargs: Any) -> "UpdateUserRequest":
"""Convert a user domain model into an update request.
Args:
model: The user domain model to convert.
kwargs: Additional keyword arguments to pass to the user update
response.
Returns:
The update request.
"""
return cast(
UpdateUserRequest,
super().from_model(model, **kwargs, password=model.get_password()),
)
routers
special
Endpoint definitions.
artifacts_endpoints
Endpoint definitions for steps (and artifacts) of pipeline runs.
create_artifact(artifact, _=Security(oauth2_password_bearer_authentication))
Create a new artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact |
ArtifactModel |
The artifact to create. |
required |
Returns:
Type | Description |
---|---|
ArtifactModel |
The created artifact. |
Source code in zenml/zen_server/routers/artifacts_endpoints.py
@router.post(
"",
response_model=ArtifactModel,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_artifact(
artifact: ArtifactModel,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> ArtifactModel:
"""Create a new artifact.
Args:
artifact: The artifact to create.
Returns:
The created artifact.
"""
return zen_store().create_artifact(artifact)
list_artifacts(artifact_uri=None, parent_step_id=None, _=Security(oauth2_password_bearer_authentication))
Get artifacts according to query filters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_uri |
Optional[str] |
If specified, only artifacts with the given URI will be returned. |
None |
parent_step_id |
Optional[uuid.UUID] |
If specified, only artifacts for the given step run will be returned. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.pipeline_models.ArtifactModel] |
The artifacts according to query filters. |
Source code in zenml/zen_server/routers/artifacts_endpoints.py
@router.get(
"",
response_model=List[ArtifactModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_artifacts(
artifact_uri: Optional[str] = None,
parent_step_id: Optional[UUID] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[ArtifactModel]:
"""Get artifacts according to query filters.
Args:
artifact_uri: If specified, only artifacts with the given URI will
be returned.
parent_step_id: If specified, only artifacts for the given step run
will be returned.
Returns:
The artifacts according to query filters.
"""
return zen_store().list_artifacts(
artifact_uri=artifact_uri,
parent_step_id=parent_step_id,
)
auth_endpoints
Endpoint definitions for authentication (login).
PasswordRequestForm
OAuth2 password grant type request form.
This form is similar to fastapi.security.OAuth2PasswordRequestForm
, with
the single difference being that it also allows an empty password.
Source code in zenml/zen_server/routers/auth_endpoints.py
class PasswordRequestForm:
"""OAuth2 password grant type request form.
This form is similar to `fastapi.security.OAuth2PasswordRequestForm`, with
the single difference being that it also allows an empty password.
"""
def __init__(
self,
grant_type: str = Form(None, regex="password"),
username: str = Form(...),
password: Optional[str] = Form(""),
scope: str = Form(""),
client_id: Optional[str] = Form(None),
client_secret: Optional[str] = Form(None),
):
"""Initializes the form.
Args:
grant_type: The grant type.
username: The username.
password: The password.
scope: The scope.
client_id: The client ID.
client_secret: The client secret.
"""
self.grant_type = grant_type
self.username = username
self.password = password
self.scope = scope
self.client_id = client_id
self.client_secret = client_secret
self.grant_type = grant_type
self.username = username
self.password = password
self.scopes = scope.split()
self.client_id = client_id
self.client_secret = client_secret
__init__(self, grant_type=Form(None), username=Form(Ellipsis), password=Form(), scope=Form(), client_id=Form(None), client_secret=Form(None))
special
Initializes the form.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
grant_type |
str |
The grant type. |
Form(None) |
username |
str |
The username. |
Form(Ellipsis) |
password |
Optional[str] |
The password. |
Form() |
scope |
str |
The scope. |
Form() |
client_id |
Optional[str] |
The client ID. |
Form(None) |
client_secret |
Optional[str] |
The client secret. |
Form(None) |
Source code in zenml/zen_server/routers/auth_endpoints.py
def __init__(
self,
grant_type: str = Form(None, regex="password"),
username: str = Form(...),
password: Optional[str] = Form(""),
scope: str = Form(""),
client_id: Optional[str] = Form(None),
client_secret: Optional[str] = Form(None),
):
"""Initializes the form.
Args:
grant_type: The grant type.
username: The username.
password: The password.
scope: The scope.
client_id: The client ID.
client_secret: The client secret.
"""
self.grant_type = grant_type
self.username = username
self.password = password
self.scope = scope
self.client_id = client_id
self.client_secret = client_secret
self.grant_type = grant_type
self.username = username
self.password = password
self.scopes = scope.split()
self.client_id = client_id
self.client_secret = client_secret
token(auth_form_data=Depends(NoneType))
Returns an access token for the given user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
auth_form_data |
PasswordRequestForm |
The authentication form data. |
Depends(NoneType) |
Returns:
Type | Description |
---|---|
Dict[str, str] |
An access token. |
Exceptions:
Type | Description |
---|---|
HTTPException |
401 if not authorized to login. |
Source code in zenml/zen_server/routers/auth_endpoints.py
@router.post(
LOGIN,
responses={401: error_response},
)
def token(
auth_form_data: PasswordRequestForm = Depends(),
) -> Dict[str, str]:
"""Returns an access token for the given user.
Args:
auth_form_data: The authentication form data.
Returns:
An access token.
Raises:
HTTPException: 401 if not authorized to login.
"""
auth_context = authenticate_credentials(
user_name_or_id=auth_form_data.username,
password=auth_form_data.password,
)
if not auth_context:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
role_assignments = zen_store().list_role_assignments(
user_name_or_id=auth_context.user.id, project_name_or_id=None
)
permissions = set().union(
*[zen_store().get_role(ra.role).permissions for ra in role_assignments]
)
access_token = auth_context.user.generate_access_token(
permissions=[p.value for p in permissions]
)
# The response of the token endpoint must be a JSON object with the
# following fields:
#
# * token_type - the token type (must be "bearer" in our case)
# * access_token - string containing the access token
return {"access_token": access_token, "token_type": "bearer"}
flavors_endpoints
Endpoint definitions for flavors.
delete_flavor(flavor_id, _=Security(oauth2_password_bearer_authentication))
Deletes a flavor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_id |
UUID |
ID of the flavor. |
required |
Source code in zenml/zen_server/routers/flavors_endpoints.py
@router.delete(
"/{flavor_id}",
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_flavor(
flavor_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
"""Deletes a flavor.
Args:
flavor_id: ID of the flavor.
"""
zen_store().delete_flavor(flavor_id)
get_flavor(flavor_id, _=Security(oauth2_password_bearer_authentication))
Returns the requested flavor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_id |
UUID |
ID of the flavor. |
required |
Returns:
Type | Description |
---|---|
FlavorModel |
The requested stack. |
Source code in zenml/zen_server/routers/flavors_endpoints.py
@router.get(
"/{flavor_id}",
response_model=FlavorModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_flavor(
flavor_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> FlavorModel:
"""Returns the requested flavor.
Args:
flavor_id: ID of the flavor.
Returns:
The requested stack.
"""
flavor = zen_store().get_flavor(flavor_id)
return flavor
list_flavors(project_name_or_id=None, component_type=None, user_name_or_id=None, name=None, is_shared=None, _=Security(oauth2_password_bearer_authentication))
Returns all flavors.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
None |
component_type |
Optional[zenml.enums.StackComponentType] |
Optionally filter by component type. |
None |
user_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by name or ID of the user. |
None |
name |
Optional[str] |
Optionally filter by flavor name. |
None |
is_shared |
Optional[bool] |
Optionally filter by shared status of the flavor. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.flavor_models.FlavorModel] |
All flavors. |
Source code in zenml/zen_server/routers/flavors_endpoints.py
@router.get(
"",
response_model=List[FlavorModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_flavors(
project_name_or_id: Optional[Union[str, UUID]] = None,
component_type: Optional[StackComponentType] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
name: Optional[str] = None,
is_shared: Optional[bool] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[FlavorModel]:
"""Returns all flavors.
Args:
project_name_or_id: Name or ID of the project.
component_type: Optionally filter by component type.
user_name_or_id: Optionally filter by name or ID of the user.
name: Optionally filter by flavor name.
is_shared: Optionally filter by shared status of the flavor.
Returns:
All flavors.
"""
flavors_list = zen_store().list_flavors(
project_name_or_id=project_name_or_id,
component_type=component_type,
user_name_or_id=user_name_or_id,
is_shared=is_shared,
name=name,
)
# if hydrated:
# return [flavor.to_hydrated_model() for flavor in flavors_list]
# else:
# return flavors_list
return flavors_list
update_flavor(flavor_id, flavor, _=Security(oauth2_password_bearer_authentication))
Updates a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_id |
UUID |
ID of the flavor. |
required |
flavor |
FlavorModel |
Flavor to use for the update. |
required |
Returns:
Type | Description |
---|---|
FlavorModel |
The updated flavor. |
Source code in zenml/zen_server/routers/flavors_endpoints.py
@router.put(
"/{flavor_id}",
response_model=FlavorModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_flavor(
flavor_id: UUID,
flavor: FlavorModel,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> FlavorModel:
"""Updates a stack.
Args:
flavor_id: ID of the flavor.
flavor: Flavor to use for the update.
Returns:
The updated flavor.
"""
flavor.id = flavor_id
updated_flavor = zen_store().update_flavor(flavor=flavor)
return updated_flavor
metadata_config_endpoints
Endpoint definitions for metadata config.
get_metadata_config(_=Security(oauth2_password_bearer_authentication))
Gets the metadata config.
Returns:
Type | Description |
---|---|
str |
The metadata config. |
Source code in zenml/zen_server/routers/metadata_config_endpoints.py
@router.get(
"",
response_model=str,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_metadata_config(
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE])
) -> str:
"""Gets the metadata config.
Returns:
The metadata config.
"""
from google.protobuf.json_format import MessageToJson
config = zen_store().get_metadata_config(expand_certs=True)
return MessageToJson(config)
metadata_sync_endpoints
Endpoint definitions for metadata config.
sync_runs(_=Security(oauth2_password_bearer_authentication))
Sync pipeline runs.
Source code in zenml/zen_server/routers/metadata_sync_endpoints.py
@router.get(
"",
response_model=None,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def sync_runs(
_: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> None:
"""Sync pipeline runs."""
try:
zen_store()._sync_runs()
except Exception:
logger.exception("Failed to sync pipeline runs.")
pipelines_endpoints
Endpoint definitions for pipelines.
delete_pipeline(pipeline_id, _=Security(oauth2_password_bearer_authentication))
Deletes a specific pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
ID of the pipeline to get. |
required |
Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.delete(
"/{pipeline_id}",
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_pipeline(
pipeline_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
"""Deletes a specific pipeline.
Args:
pipeline_id: ID of the pipeline to get.
"""
zen_store().delete_pipeline(pipeline_id=pipeline_id)
get_pipeline(pipeline_id, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Gets a specific pipeline using its unique id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
ID of the pipeline to get. |
required |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[zenml.zen_server.models.pipeline_models.HydratedPipelineModel, zenml.models.pipeline_models.PipelineModel] |
A specific pipeline object. |
Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.get(
"/{pipeline_id}",
response_model=Union[HydratedPipelineModel, PipelineModel], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_pipeline(
pipeline_id: UUID,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[HydratedPipelineModel, PipelineModel]:
"""Gets a specific pipeline using its unique id.
Args:
pipeline_id: ID of the pipeline to get.
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
A specific pipeline object.
"""
pipeline = zen_store().get_pipeline(pipeline_id=pipeline_id)
if hydrated:
return HydratedPipelineModel.from_model(pipeline)
else:
return pipeline
get_pipeline_spec(pipeline_id, _=Security(oauth2_password_bearer_authentication))
Gets the spec of a specific pipeline using its unique id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
ID of the pipeline to get. |
required |
Returns:
Type | Description |
---|---|
PipelineSpec |
The spec of the pipeline. |
Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.get(
"/{pipeline_id}" + PIPELINE_SPEC,
response_model=PipelineSpec,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_pipeline_spec(
pipeline_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> PipelineSpec:
"""Gets the spec of a specific pipeline using its unique id.
Args:
pipeline_id: ID of the pipeline to get.
Returns:
The spec of the pipeline.
"""
return zen_store().get_pipeline(pipeline_id).spec
list_pipeline_runs(pipeline_id, project_name_or_id=None, stack_id=None, run_name=None, user_name_or_id=None, component_id=None, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Get pipeline runs according to query filters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
ID of the pipeline for which to list runs. |
required |
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project for which to filter runs. |
None |
stack_id |
Optional[uuid.UUID] |
ID of the stack for which to filter runs. |
None |
run_name |
Optional[str] |
Filter by run name if provided |
None |
user_name_or_id |
Union[str, uuid.UUID] |
If provided, only return runs for this user. |
None |
component_id |
Optional[uuid.UUID] |
Filter by ID of a component that was used in the run. |
None |
hydrated |
bool |
Defines if stack, user and pipeline will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[List[zenml.zen_server.models.pipeline_models.HydratedPipelineRunModel], List[zenml.models.pipeline_models.PipelineRunModel]] |
The pipeline runs according to query filters. |
Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.get(
"/{pipeline_id}" + RUNS,
response_model=Union[ # type: ignore[arg-type]
List[HydratedPipelineRunModel], List[PipelineRunModel]
],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_pipeline_runs(
pipeline_id: UUID,
project_name_or_id: Optional[Union[str, UUID]] = None,
stack_id: Optional[UUID] = None,
run_name: Optional[str] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
component_id: Optional[UUID] = None,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[List[HydratedPipelineRunModel], List[PipelineRunModel]]:
"""Get pipeline runs according to query filters.
Args:
pipeline_id: ID of the pipeline for which to list runs.
project_name_or_id: Name or ID of the project for which to filter runs.
stack_id: ID of the stack for which to filter runs.
run_name: Filter by run name if provided
user_name_or_id: If provided, only return runs for this user.
component_id: Filter by ID of a component that was used in the run.
hydrated: Defines if stack, user and pipeline will be
included by reference (FALSE) or as model (TRUE)
Returns:
The pipeline runs according to query filters.
"""
runs = zen_store().list_runs(
project_name_or_id=project_name_or_id,
run_name=run_name,
stack_id=stack_id,
component_id=component_id,
user_name_or_id=user_name_or_id,
pipeline_id=pipeline_id,
)
if hydrated:
return [HydratedPipelineRunModel.from_model(run) for run in runs]
else:
return runs
list_pipelines(project_name_or_id=None, user_name_or_id=None, name=None, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Gets a list of pipelines.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project to get pipelines for. |
None |
user_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by name or ID of the user. |
None |
name |
Optional[str] |
Optionally filter by pipeline name |
None |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[List[zenml.zen_server.models.pipeline_models.HydratedPipelineModel], List[zenml.models.pipeline_models.PipelineModel]] |
List of pipeline objects. |
Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.get(
"",
response_model=Union[List[HydratedPipelineModel], List[PipelineModel]], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_pipelines(
project_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
name: Optional[str] = None,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[List[HydratedPipelineModel], List[PipelineModel]]:
"""Gets a list of pipelines.
Args:
project_name_or_id: Name or ID of the project to get pipelines for.
user_name_or_id: Optionally filter by name or ID of the user.
name: Optionally filter by pipeline name
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
List of pipeline objects.
"""
pipelines_list = zen_store().list_pipelines(
project_name_or_id=project_name_or_id,
user_name_or_id=user_name_or_id,
name=name,
)
if hydrated:
return [
HydratedPipelineModel.from_model(pipeline)
for pipeline in pipelines_list
]
else:
return pipelines_list
update_pipeline(pipeline_id, pipeline_update, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Updates the attribute on a specific pipeline using its unique id.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
ID of the pipeline to get. |
required |
pipeline_update |
UpdatePipelineRequest |
the model containing the attributes to update. |
required |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[zenml.zen_server.models.pipeline_models.HydratedPipelineModel, zenml.models.pipeline_models.PipelineModel] |
The updated pipeline object. |
Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.put(
"/{pipeline_id}",
response_model=Union[HydratedPipelineModel, PipelineModel], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_pipeline(
pipeline_id: UUID,
pipeline_update: UpdatePipelineRequest,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> Union[HydratedPipelineModel, PipelineModel]:
"""Updates the attribute on a specific pipeline using its unique id.
Args:
pipeline_id: ID of the pipeline to get.
pipeline_update: the model containing the attributes to update.
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
The updated pipeline object.
"""
pipeline_in_db = zen_store().get_pipeline(pipeline_id)
updated_pipeline = zen_store().update_pipeline(
pipeline=pipeline_update.apply_to_model(pipeline_in_db)
)
if hydrated:
return HydratedPipelineModel.from_model(updated_pipeline)
else:
return updated_pipeline
projects_endpoints
Endpoint definitions for projects.
create_flavor(project_name_or_id, flavor, auth_context=Security(oauth2_password_bearer_authentication))
Creates a stack component flavor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
flavor |
FlavorModel |
Stack component flavor to register. |
required |
auth_context |
AuthContext |
Authentication context. |
Security(oauth2_password_bearer_authentication) |
Returns:
Type | Description |
---|---|
FlavorModel |
The created stack component flavor. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
"/{project_name_or_id}" + FLAVORS,
response_model=FlavorModel,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_flavor(
project_name_or_id: Union[str, UUID],
flavor: FlavorModel,
auth_context: AuthContext = Security(
authorize, scopes=[PermissionType.WRITE]
),
) -> FlavorModel:
"""Creates a stack component flavor.
Args:
project_name_or_id: Name or ID of the project.
flavor: Stack component flavor to register.
auth_context: Authentication context.
Returns:
The created stack component flavor.
"""
project = zen_store().get_project(project_name_or_id)
flavor.project = project.id
flavor.user = auth_context.user.id
created_flavor = zen_store().create_flavor(
flavor=flavor,
)
return created_flavor
create_pipeline(project_name_or_id, pipeline, hydrated=False, auth_context=Security(oauth2_password_bearer_authentication))
Creates a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
pipeline |
CreatePipelineRequest |
Pipeline to create. |
required |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
auth_context |
AuthContext |
Authentication context. |
Security(oauth2_password_bearer_authentication) |
Returns:
Type | Description |
---|---|
Union[zenml.zen_server.models.pipeline_models.HydratedPipelineModel, zenml.models.pipeline_models.PipelineModel] |
The created pipeline. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
"/{project_name_or_id}" + PIPELINES,
response_model=Union[HydratedPipelineModel, PipelineModel], # type: ignore[arg-type]
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_pipeline(
project_name_or_id: Union[str, UUID],
pipeline: CreatePipelineRequest,
hydrated: bool = False,
auth_context: AuthContext = Security(
authorize, scopes=[PermissionType.WRITE]
),
) -> Union[HydratedPipelineModel, PipelineModel]:
"""Creates a pipeline.
Args:
project_name_or_id: Name or ID of the project.
pipeline: Pipeline to create.
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
auth_context: Authentication context.
Returns:
The created pipeline.
"""
project = zen_store().get_project(project_name_or_id)
pipeline_model = pipeline.to_model(
project=project.id,
user=auth_context.user.id,
)
created_pipeline = zen_store().create_pipeline(pipeline=pipeline_model)
if hydrated:
return HydratedPipelineModel.from_model(created_pipeline)
else:
return created_pipeline
create_pipeline_run(project_name_or_id, pipeline_run, auth_context=Security(oauth2_password_bearer_authentication), get_if_exists=False)
Creates a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
pipeline_run |
CreatePipelineRunRequest |
Pipeline run to create. |
required |
auth_context |
AuthContext |
Authentication context. |
Security(oauth2_password_bearer_authentication) |
get_if_exists |
bool |
If a similar pipeline run already exists, return it instead of raising an error. |
False |
Returns:
Type | Description |
---|---|
PipelineRunModel |
The created pipeline run. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
"/{project_name_or_id}" + RUNS,
response_model=PipelineRunModel,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_pipeline_run(
project_name_or_id: Union[str, UUID],
pipeline_run: CreatePipelineRunRequest,
auth_context: AuthContext = Security(
authorize, scopes=[PermissionType.WRITE]
),
get_if_exists: bool = False,
) -> PipelineRunModel:
"""Creates a pipeline run.
Args:
project_name_or_id: Name or ID of the project.
pipeline_run: Pipeline run to create.
auth_context: Authentication context.
get_if_exists: If a similar pipeline run already exists, return it
instead of raising an error.
Returns:
The created pipeline run.
"""
project = zen_store().get_project(project_name_or_id)
pipeline_run_model = pipeline_run.to_model(
project=project.id,
user=auth_context.user.id,
)
if get_if_exists:
return zen_store().get_or_create_run(pipeline_run=pipeline_run_model)
return zen_store().create_run(pipeline_run=pipeline_run_model)
create_project(project, _=Security(oauth2_password_bearer_authentication))
Creates a project based on the requestBody.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project |
CreateProjectRequest |
Project to create. |
required |
Returns:
Type | Description |
---|---|
ProjectModel |
The created project. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
"",
response_model=ProjectModel,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_project(
project: CreateProjectRequest,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> ProjectModel:
"""Creates a project based on the requestBody.
# noqa: DAR401
Args:
project: Project to create.
Returns:
The created project.
"""
return zen_store().create_project(project=project.to_model())
create_stack(project_name_or_id, stack, hydrated=False, auth_context=Security(oauth2_password_bearer_authentication))
Creates a stack for a particular project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
stack |
CreateStackRequest |
Stack to register. |
required |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
auth_context |
AuthContext |
The authentication context. |
Security(oauth2_password_bearer_authentication) |
Returns:
Type | Description |
---|---|
Union[zenml.models.stack_models.HydratedStackModel, zenml.models.stack_models.StackModel] |
The created stack. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
"/{project_name_or_id}" + STACKS,
response_model=Union[HydratedStackModel, StackModel], # type: ignore[arg-type]
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_stack(
project_name_or_id: Union[str, UUID],
stack: CreateStackRequest,
hydrated: bool = False,
auth_context: AuthContext = Security(
authorize, scopes=[PermissionType.WRITE]
),
) -> Union[HydratedStackModel, StackModel]:
"""Creates a stack for a particular project.
Args:
project_name_or_id: Name or ID of the project.
stack: Stack to register.
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
auth_context: The authentication context.
Returns:
The created stack.
"""
project = zen_store().get_project(project_name_or_id)
full_stack = stack.to_model(
project=project.id,
user=auth_context.user.id,
)
created_stack = zen_store().create_stack(stack=full_stack)
if hydrated:
return created_stack.to_hydrated_model()
else:
return created_stack
create_stack_component(project_name_or_id, component, hydrated=False, auth_context=Security(oauth2_password_bearer_authentication))
Creates a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
component |
CreateComponentModel |
Stack component to register. |
required |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
auth_context |
AuthContext |
Authentication context. |
Security(oauth2_password_bearer_authentication) |
Returns:
Type | Description |
---|---|
Union[zenml.models.component_model.ComponentModel, zenml.models.component_model.HydratedComponentModel] |
The created stack component. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
"/{project_name_or_id}" + STACK_COMPONENTS,
response_model=Union[ComponentModel, HydratedComponentModel], # type: ignore[arg-type]
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_stack_component(
project_name_or_id: Union[str, UUID],
component: CreateComponentModel,
hydrated: bool = False,
auth_context: AuthContext = Security(
authorize, scopes=[PermissionType.WRITE]
),
) -> Union[ComponentModel, HydratedComponentModel]:
"""Creates a stack component.
Args:
project_name_or_id: Name or ID of the project.
component: Stack component to register.
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
auth_context: Authentication context.
Returns:
The created stack component.
"""
project = zen_store().get_project(project_name_or_id)
full_component = component.to_model(
project=project.id,
user=auth_context.user.id,
)
# TODO: [server] if possible it should validate here that the configuration
# conforms to the flavor
created_component = zen_store().create_stack_component(
component=full_component,
)
if hydrated:
return created_component.to_hydrated_model()
else:
return created_component
delete_project(project_name_or_id, _=Security(oauth2_password_bearer_authentication))
Deletes a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.delete(
"/{project_name_or_id}",
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_project(
project_name_or_id: Union[str, UUID],
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
"""Deletes a project.
Args:
project_name_or_id: Name or ID of the project.
"""
zen_store().delete_project(project_name_or_id=project_name_or_id)
get_project(project_name_or_id, _=Security(oauth2_password_bearer_authentication))
Get a project for given name.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
Returns:
Type | Description |
---|---|
ProjectModel |
The requested project. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
"/{project_name_or_id}",
response_model=ProjectModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_project(
project_name_or_id: Union[str, UUID],
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> ProjectModel:
"""Get a project for given name.
# noqa: DAR401
Args:
project_name_or_id: Name or ID of the project.
Returns:
The requested project.
"""
return zen_store().get_project(project_name_or_id=project_name_or_id)
get_project_statistics(project_name_or_id, _=Security(oauth2_password_bearer_authentication))
Gets statistics of a project.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project to get statistics for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, int] |
All pipelines within the project. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
"/{project_name_or_id}" + STATISTICS,
response_model=Dict[str, str],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_project_statistics(
project_name_or_id: Union[str, UUID],
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, int]:
"""Gets statistics of a project.
# noqa: DAR401
Args:
project_name_or_id: Name or ID of the project to get statistics for.
Returns:
All pipelines within the project.
"""
# TODO: [server] instead of actually querying all the rows, we should
# use zen_store methods that just return counts
zen_store().list_runs()
return {
"stacks": len(
zen_store().list_stacks(project_name_or_id=project_name_or_id)
),
"components": len(
zen_store().list_stack_components(
project_name_or_id=project_name_or_id
)
),
"pipelines": len(
zen_store().list_pipelines(project_name_or_id=project_name_or_id)
),
"runs": len(
zen_store().list_runs(project_name_or_id=project_name_or_id)
),
}
get_role_assignments_for_project(project_name_or_id, user_name_or_id=None, team_name_or_id=None, _=Security(oauth2_password_bearer_authentication))
Returns a list of all roles that are assigned to a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
user_name_or_id |
Union[str, uuid.UUID] |
If provided, only list roles that are assigned to the given user. |
None |
team_name_or_id |
Union[str, uuid.UUID] |
If provided, only list roles that are assigned to the given team. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.RoleAssignmentModel] |
A list of all roles that are assigned to a team. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
"/{project_name_or_id}" + ROLES,
response_model=List[RoleAssignmentModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_role_assignments_for_project(
project_name_or_id: Union[str, UUID],
user_name_or_id: Optional[Union[str, UUID]] = None,
team_name_or_id: Optional[Union[str, UUID]] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[RoleAssignmentModel]:
"""Returns a list of all roles that are assigned to a team.
Args:
project_name_or_id: Name or ID of the project.
user_name_or_id: If provided, only list roles that are assigned to the
given user.
team_name_or_id: If provided, only list roles that are assigned to the
given team.
Returns:
A list of all roles that are assigned to a team.
"""
return zen_store().list_role_assignments(
project_name_or_id=project_name_or_id,
user_name_or_id=user_name_or_id,
team_name_or_id=team_name_or_id,
)
list_project_flavors(project_name_or_id=None, component_type=None, user_name_or_id=None, name=None, is_shared=None, _=Security(oauth2_password_bearer_authentication))
List stack components flavors of a certain type that are part of a project.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_type |
Optional[zenml.enums.StackComponentType] |
Type of the component. |
None |
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
None |
user_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by name or ID of the user. |
None |
name |
Optional[str] |
Optionally filter by flavor name. |
None |
is_shared |
Optional[bool] |
Optionally filter by shared status of the flavor. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.flavor_models.FlavorModel] |
All stack components of a certain type that are part of a project. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
"/{project_name_or_id}" + FLAVORS,
response_model=List[FlavorModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_project_flavors(
project_name_or_id: Optional[Union[str, UUID]] = None,
component_type: Optional[StackComponentType] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
name: Optional[str] = None,
is_shared: Optional[bool] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[FlavorModel]:
"""List stack components flavors of a certain type that are part of a project.
# noqa: DAR401
Args:
component_type: Type of the component.
project_name_or_id: Name or ID of the project.
user_name_or_id: Optionally filter by name or ID of the user.
name: Optionally filter by flavor name.
is_shared: Optionally filter by shared status of the flavor.
Returns:
All stack components of a certain type that are part of a project.
"""
flavors_list = zen_store().list_flavors(
project_name_or_id=project_name_or_id,
component_type=component_type,
user_name_or_id=user_name_or_id,
is_shared=is_shared,
name=name,
)
# if hydrated:
# return [flavor.to_hydrated_model() for flavor in flavors_list]
# else:
# return flavors_list
return flavors_list
list_project_pipelines(project_name_or_id, user_name_or_id=None, name=None, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Gets pipelines defined for a specific project.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project to get pipelines for. |
required |
user_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by name or ID of the user. |
None |
name |
Optional[str] |
Optionally filter by pipeline name |
None |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[List[zenml.zen_server.models.pipeline_models.HydratedPipelineModel], List[zenml.models.pipeline_models.PipelineModel]] |
All pipelines within the project. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
"/{project_name_or_id}" + PIPELINES,
response_model=Union[List[HydratedPipelineModel], List[PipelineModel]], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_project_pipelines(
project_name_or_id: Union[str, UUID],
user_name_or_id: Optional[Union[str, UUID]] = None,
name: Optional[str] = None,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[List[HydratedPipelineModel], List[PipelineModel]]:
"""Gets pipelines defined for a specific project.
# noqa: DAR401
Args:
project_name_or_id: Name or ID of the project to get pipelines for.
user_name_or_id: Optionally filter by name or ID of the user.
name: Optionally filter by pipeline name
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
All pipelines within the project.
"""
pipelines_list = zen_store().list_pipelines(
project_name_or_id=project_name_or_id,
user_name_or_id=user_name_or_id,
name=name,
)
if hydrated:
return [
HydratedPipelineModel.from_model(pipeline)
for pipeline in pipelines_list
]
else:
return pipelines_list
list_project_stack_components(project_name_or_id, user_name_or_id=None, type=None, name=None, flavor_name=None, is_shared=None, hydrated=False, _=Security(oauth2_password_bearer_authentication))
List stack components that are part of a specific project.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
user_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by name or ID of the user. |
None |
name |
Optional[str] |
Optionally filter by component name |
None |
type |
Optional[str] |
Optionally filter by component type |
None |
flavor_name |
Optional[str] |
Optionally filter by flavor name |
None |
is_shared |
Optional[bool] |
Optionally filter by shared status of the component |
None |
hydrated |
bool |
Defines if users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[List[zenml.models.component_model.ComponentModel], List[zenml.models.component_model.HydratedComponentModel]] |
All stack components part of the specified project. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
"/{project_name_or_id}" + STACK_COMPONENTS,
response_model=Union[List[ComponentModel], List[HydratedComponentModel]], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_project_stack_components(
project_name_or_id: Union[str, UUID],
user_name_or_id: Optional[Union[str, UUID]] = None,
type: Optional[str] = None,
name: Optional[str] = None,
flavor_name: Optional[str] = None,
is_shared: Optional[bool] = None,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[List[ComponentModel], List[HydratedComponentModel]]:
"""List stack components that are part of a specific project.
# noqa: DAR401
Args:
project_name_or_id: Name or ID of the project.
user_name_or_id: Optionally filter by name or ID of the user.
name: Optionally filter by component name
type: Optionally filter by component type
flavor_name: Optionally filter by flavor name
is_shared: Optionally filter by shared status of the component
hydrated: Defines if users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
All stack components part of the specified project.
"""
components_list = zen_store().list_stack_components(
project_name_or_id=project_name_or_id,
user_name_or_id=user_name_or_id,
type=type,
is_shared=is_shared,
name=name,
flavor_name=flavor_name,
)
if hydrated:
return [comp.to_hydrated_model() for comp in components_list]
else:
return components_list
list_project_stacks(project_name_or_id, user_name_or_id=None, component_id=None, stack_name=None, is_shared=None, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Get stacks that are part of a specific project.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. |
required |
user_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by name or ID of the user. |
None |
component_id |
Optional[uuid.UUID] |
Optionally filter by component that is part of the stack. |
None |
stack_name |
Optional[str] |
Optionally filter by stack name |
None |
is_shared |
Optional[bool] |
Optionally filter by shared status of the stack |
None |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[List[zenml.models.stack_models.HydratedStackModel], List[zenml.models.stack_models.StackModel]] |
All stacks part of the specified project. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
"/{project_name_or_id}" + STACKS,
response_model=Union[List[HydratedStackModel], List[StackModel]], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_project_stacks(
project_name_or_id: Union[str, UUID],
user_name_or_id: Optional[Union[str, UUID]] = None,
component_id: Optional[UUID] = None,
stack_name: Optional[str] = None,
is_shared: Optional[bool] = None,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[List[HydratedStackModel], List[StackModel]]:
"""Get stacks that are part of a specific project.
# noqa: DAR401
Args:
project_name_or_id: Name or ID of the project.
user_name_or_id: Optionally filter by name or ID of the user.
component_id: Optionally filter by component that is part of the stack.
stack_name: Optionally filter by stack name
is_shared: Optionally filter by shared status of the stack
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
All stacks part of the specified project.
"""
stacks_list = zen_store().list_stacks(
project_name_or_id=project_name_or_id,
user_name_or_id=user_name_or_id,
component_id=component_id,
is_shared=is_shared,
name=stack_name,
)
if hydrated:
return [stack.to_hydrated_model() for stack in stacks_list]
else:
return stacks_list
list_projects(_=Security(oauth2_password_bearer_authentication))
Lists all projects in the organization.
Returns:
Type | Description |
---|---|
List[zenml.models.project_models.ProjectModel] |
A list of projects. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
"",
response_model=List[ProjectModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_projects(
_: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> List[ProjectModel]:
"""Lists all projects in the organization.
Returns:
A list of projects.
"""
return zen_store().list_projects()
update_project(project_name_or_id, project_update, _=Security(oauth2_password_bearer_authentication))
Get a project for given name.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project to update. |
required |
project_update |
UpdateProjectRequest |
the project to use to update |
required |
Returns:
Type | Description |
---|---|
ProjectModel |
The updated project. |
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.put(
"/{project_name_or_id}",
response_model=ProjectModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_project(
project_name_or_id: Union[str, UUID],
project_update: UpdateProjectRequest,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> ProjectModel:
"""Get a project for given name.
# noqa: DAR401
Args:
project_name_or_id: Name or ID of the project to update.
project_update: the project to use to update
Returns:
The updated project.
"""
project_in_db = zen_store().get_project(project_name_or_id)
return zen_store().update_project(
project=project_update.apply_to_model(project_in_db),
)
roles_endpoints
Endpoint definitions for roles and role assignment.
create_role(role, _=Security(oauth2_password_bearer_authentication))
Creates a role.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role |
CreateRoleRequest |
Role to create. |
required |
Returns:
Type | Description |
---|---|
RoleModel |
The created role. |
Source code in zenml/zen_server/routers/roles_endpoints.py
@router.post(
"",
response_model=RoleModel,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_role(
role: CreateRoleRequest,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> RoleModel:
"""Creates a role.
# noqa: DAR401
Args:
role: Role to create.
Returns:
The created role.
"""
return zen_store().create_role(role=role.to_model())
delete_role(role_name_or_id, _=Security(oauth2_password_bearer_authentication))
Deletes a specific role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the role. |
required |
Source code in zenml/zen_server/routers/roles_endpoints.py
@router.delete(
"/{role_name_or_id}",
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_role(
role_name_or_id: Union[str, UUID],
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
"""Deletes a specific role.
Args:
role_name_or_id: Name or ID of the role.
"""
zen_store().delete_role(role_name_or_id=role_name_or_id)
get_role(role_name_or_id, _=Security(oauth2_password_bearer_authentication))
Returns a specific role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the role. |
required |
Returns:
Type | Description |
---|---|
RoleModel |
A specific role. |
Source code in zenml/zen_server/routers/roles_endpoints.py
@router.get(
"/{role_name_or_id}",
response_model=RoleModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_role(
role_name_or_id: Union[str, UUID],
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> RoleModel:
"""Returns a specific role.
Args:
role_name_or_id: Name or ID of the role.
Returns:
A specific role.
"""
return zen_store().get_role(role_name_or_id=role_name_or_id)
list_roles(_=Security(oauth2_password_bearer_authentication))
Returns a list of all roles.
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.RoleModel] |
List of all roles. |
Source code in zenml/zen_server/routers/roles_endpoints.py
@router.get(
"",
response_model=List[RoleModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_roles(
_: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> List[RoleModel]:
"""Returns a list of all roles.
Returns:
List of all roles.
"""
return zen_store().list_roles()
update_role(role_name_or_id, role_update, _=Security(oauth2_password_bearer_authentication))
Updates a role.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the role. |
required |
role_update |
UpdateRoleRequest |
Role update. |
required |
Returns:
Type | Description |
---|---|
RoleModel |
The created role. |
Source code in zenml/zen_server/routers/roles_endpoints.py
@router.put(
"/{role_name_or_id}",
response_model=RoleModel,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def update_role(
role_name_or_id: Union[str, UUID],
role_update: UpdateRoleRequest,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> RoleModel:
"""Updates a role.
# noqa: DAR401
Args:
role_name_or_id: Name or ID of the role.
role_update: Role update.
Returns:
The created role.
"""
role_in_db = zen_store().get_role(role_name_or_id)
return zen_store().update_role(role=role_update.apply_to_model(role_in_db))
runs_endpoints
Endpoint definitions for pipeline runs.
get_pipeline_configuration(run_id, _=Security(oauth2_password_bearer_authentication))
Get the pipeline configuration of a specific pipeline run using its ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
ID of the pipeline run to get. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The pipeline configuration of the pipeline run. |
Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
"/{run_id}" + PIPELINE_CONFIGURATION,
response_model=Dict[str, Any],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_pipeline_configuration(
run_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, Any]:
"""Get the pipeline configuration of a specific pipeline run using its ID.
Args:
run_id: ID of the pipeline run to get.
Returns:
The pipeline configuration of the pipeline run.
"""
return zen_store().get_run(run_name_or_id=run_id).pipeline_configuration
get_run(run_name_or_id, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Get a specific pipeline run using its ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the pipeline run to get. |
required |
hydrated |
bool |
Defines if stack, user and pipeline will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[zenml.zen_server.models.pipeline_models.HydratedPipelineRunModel, zenml.models.pipeline_models.PipelineRunModel] |
The pipeline run. |
Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
"/{run_name_or_id}",
response_model=Union[HydratedPipelineRunModel, PipelineRunModel], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run(
run_name_or_id: Union[str, UUID],
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[HydratedPipelineRunModel, PipelineRunModel]:
"""Get a specific pipeline run using its ID.
Args:
run_name_or_id: Name or ID of the pipeline run to get.
hydrated: Defines if stack, user and pipeline will be
included by reference (FALSE) or as model (TRUE)
Returns:
The pipeline run.
"""
run = zen_store().get_run(run_name_or_id=run_name_or_id)
if hydrated:
return HydratedPipelineRunModel.from_model(run)
else:
return run
get_run_component_side_effects(run_id, component_id=None, _=Security(oauth2_password_bearer_authentication))
Get the component side-effects for a given pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
ID of the pipeline run to use to get the component side-effects. |
required |
component_id |
Optional[uuid.UUID] |
ID of the component to use to get the component side-effects. |
None |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The component side-effects for a given pipeline run. |
Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
"/{run_id}" + COMPONENT_SIDE_EFFECTS,
response_model=Dict,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run_component_side_effects(
run_id: UUID,
component_id: Optional[UUID] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, Any]:
"""Get the component side-effects for a given pipeline run.
Args:
run_id: ID of the pipeline run to use to get the component side-effects.
component_id: ID of the component to use to get the component
side-effects.
Returns:
The component side-effects for a given pipeline run.
"""
return zen_store().get_run_component_side_effects(
run_id=run_id,
component_id=component_id,
)
get_run_dag(run_id, _=Security(oauth2_password_bearer_authentication))
Get the DAG for a given pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
ID of the pipeline run to use to get the DAG. |
required |
Returns:
Type | Description |
---|---|
LineageGraph |
The DAG for a given pipeline run. |
Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
"/{run_id}" + GRAPH,
response_model=LineageGraph,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run_dag(
run_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> LineageGraph:
"""Get the DAG for a given pipeline run.
Args:
run_id: ID of the pipeline run to use to get the DAG.
Returns:
The DAG for a given pipeline run.
"""
from zenml.post_execution.pipeline_run import PipelineRunView
run = zen_store().get_run(run_name_or_id=run_id)
graph = LineageGraph()
graph.generate_run_nodes_and_edges(PipelineRunView(run))
return graph
get_run_status(run_id, _=Security(oauth2_password_bearer_authentication))
Get the status of a specific pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
ID of the pipeline run for which to get the status. |
required |
Returns:
Type | Description |
---|---|
ExecutionStatus |
The status of the pipeline run. |
Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
"/{run_id}" + STATUS,
response_model=ExecutionStatus,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run_status(
run_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> ExecutionStatus:
"""Get the status of a specific pipeline run.
Args:
run_id: ID of the pipeline run for which to get the status.
Returns:
The status of the pipeline run.
"""
return zen_store().get_run(run_id).status
get_run_steps(run_id, _=Security(oauth2_password_bearer_authentication))
Get all steps for a given pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
ID of the pipeline run to use to get the DAG. |
required |
Returns:
Type | Description |
---|---|
List[zenml.models.pipeline_models.StepRunModel] |
The steps for a given pipeline run. |
Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
"/{run_id}" + STEPS,
response_model=List[StepRunModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run_steps(
run_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[StepRunModel]:
"""Get all steps for a given pipeline run.
Args:
run_id: ID of the pipeline run to use to get the DAG.
Returns:
The steps for a given pipeline run.
"""
return zen_store().list_run_steps(run_id)
list_runs(project_name_or_id=None, stack_id=None, run_name=None, user_name_or_id=None, component_id=None, pipeline_id=None, unlisted=False, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Get pipeline runs according to query filters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project for which to filter runs. |
None |
stack_id |
Optional[uuid.UUID] |
ID of the stack for which to filter runs. |
None |
run_name |
Optional[str] |
Filter by run name if provided |
None |
user_name_or_id |
Union[str, uuid.UUID] |
If provided, only return runs for this user. |
None |
component_id |
Optional[uuid.UUID] |
Filter by ID of a component that was used in the run. |
None |
pipeline_id |
Optional[uuid.UUID] |
ID of the pipeline for which to filter runs. |
None |
unlisted |
bool |
If True, only return unlisted runs that are not associated with any pipeline. |
False |
hydrated |
bool |
Defines if stack, user and pipeline will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[List[zenml.zen_server.models.pipeline_models.HydratedPipelineRunModel], List[zenml.models.pipeline_models.PipelineRunModel]] |
The pipeline runs according to query filters. |
Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
"",
response_model=Union[ # type: ignore[arg-type]
List[HydratedPipelineRunModel], List[PipelineRunModel]
],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_runs(
project_name_or_id: Optional[Union[str, UUID]] = None,
stack_id: Optional[UUID] = None,
run_name: Optional[str] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
component_id: Optional[UUID] = None,
pipeline_id: Optional[UUID] = None,
unlisted: bool = False,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[List[HydratedPipelineRunModel], List[PipelineRunModel]]:
"""Get pipeline runs according to query filters.
Args:
project_name_or_id: Name or ID of the project for which to filter runs.
stack_id: ID of the stack for which to filter runs.
run_name: Filter by run name if provided
user_name_or_id: If provided, only return runs for this user.
component_id: Filter by ID of a component that was used in the run.
pipeline_id: ID of the pipeline for which to filter runs.
unlisted: If True, only return unlisted runs that are not
associated with any pipeline.
hydrated: Defines if stack, user and pipeline will be
included by reference (FALSE) or as model (TRUE)
Returns:
The pipeline runs according to query filters.
"""
runs = zen_store().list_runs(
project_name_or_id=project_name_or_id,
run_name=run_name,
stack_id=stack_id,
component_id=component_id,
user_name_or_id=user_name_or_id,
pipeline_id=pipeline_id,
unlisted=unlisted,
)
if hydrated:
return [HydratedPipelineRunModel.from_model(run) for run in runs]
else:
return runs
update_run(run_id, run_model, _=Security(oauth2_password_bearer_authentication))
Updates a run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
ID of the run. |
required |
run_model |
PipelineRunModel |
Run model to use for the update. |
required |
Returns:
Type | Description |
---|---|
PipelineRunModel |
The updated run model. |
Source code in zenml/zen_server/routers/runs_endpoints.py
@router.put(
"/{run_id}",
response_model=PipelineRunModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_run(
run_id: UUID,
run_model: PipelineRunModel,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> PipelineRunModel:
"""Updates a run.
Args:
run_id: ID of the run.
run_model: Run model to use for the update.
Returns:
The updated run model.
"""
run_model.id = run_id
updated_run = zen_store().update_run(run=run_model)
return updated_run
server_endpoints
Endpoint definitions for authentication (login).
server_info()
Get information about the server.
Returns:
Type | Description |
---|---|
ServerModel |
Information about the server. |
Source code in zenml/zen_server/routers/server_endpoints.py
@router.get(
INFO,
response_model=ServerModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def server_info() -> ServerModel:
"""Get information about the server.
Returns:
Information about the server.
"""
return zen_store().get_store_info()
version()
Get version of the server.
Returns:
Type | Description |
---|---|
str |
String representing the version of the server. |
Source code in zenml/zen_server/routers/server_endpoints.py
@router.get("/version")
def version() -> str:
"""Get version of the server.
Returns:
String representing the version of the server.
"""
return zenml.__version__
stack_components_endpoints
Endpoint definitions for stack components.
deregister_stack_component(component_id, _=Security(oauth2_password_bearer_authentication))
Deletes a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
UUID |
ID of the stack component. |
required |
Source code in zenml/zen_server/routers/stack_components_endpoints.py
@router.delete(
"/{component_id}",
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def deregister_stack_component(
component_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
"""Deletes a stack component.
Args:
component_id: ID of the stack component.
"""
zen_store().delete_stack_component(component_id)
get_stack_component(component_id, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Returns the requested stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
UUID |
ID of the stack component. |
required |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[zenml.models.component_model.ComponentModel, zenml.models.component_model.HydratedComponentModel] |
The requested stack component. |
Source code in zenml/zen_server/routers/stack_components_endpoints.py
@router.get(
"/{component_id}",
response_model=Union[ComponentModel, HydratedComponentModel], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_stack_component(
component_id: UUID,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[ComponentModel, HydratedComponentModel]:
"""Returns the requested stack component.
Args:
component_id: ID of the stack component.
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
The requested stack component.
"""
component = zen_store().get_stack_component(component_id)
if hydrated:
return component.to_hydrated_model()
else:
return component
get_stack_component_types(_=Security(oauth2_password_bearer_authentication))
Get a list of all stack component types.
Returns:
Type | Description |
---|---|
List[str] |
List of stack components. |
Source code in zenml/zen_server/routers/stack_components_endpoints.py
@types_router.get(
"",
response_model=List[str],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_stack_component_types(
_: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> List[str]:
"""Get a list of all stack component types.
Returns:
List of stack components.
"""
return StackComponentType.values()
list_stack_components(project_name_or_id=None, user_name_or_id=None, type=None, name=None, flavor_name=None, is_shared=None, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Get a list of all stack components for a specific type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project |
None |
user_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by name or ID of the user. |
None |
name |
Optional[str] |
Optionally filter by component name |
None |
type |
Optional[str] |
Optionally filter by component type |
None |
flavor_name |
Optional[str] |
Optionally filter by flavor |
None |
is_shared |
Optional[bool] |
Optionally filter by shared status of the component |
None |
hydrated |
bool |
Defines if users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[List[zenml.models.component_model.ComponentModel], List[zenml.models.component_model.HydratedComponentModel]] |
List of stack components for a specific type. |
Source code in zenml/zen_server/routers/stack_components_endpoints.py
@router.get(
"",
response_model=Union[List[ComponentModel], List[HydratedComponentModel]], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_stack_components(
project_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
type: Optional[str] = None,
name: Optional[str] = None,
flavor_name: Optional[str] = None,
is_shared: Optional[bool] = None,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[List[ComponentModel], List[HydratedComponentModel]]:
"""Get a list of all stack components for a specific type.
Args:
project_name_or_id: Name or ID of the project
user_name_or_id: Optionally filter by name or ID of the user.
name: Optionally filter by component name
type: Optionally filter by component type
flavor_name: Optionally filter by flavor
is_shared: Optionally filter by shared status of the component
hydrated: Defines if users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
List of stack components for a specific type.
"""
components_list = zen_store().list_stack_components(
project_name_or_id=project_name_or_id,
user_name_or_id=user_name_or_id,
type=type,
name=name,
flavor_name=flavor_name,
is_shared=is_shared,
)
if hydrated:
return [comp.to_hydrated_model() for comp in components_list]
else:
return components_list
update_stack_component(component_id, component_update, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Updates a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
UUID |
ID of the stack component. |
required |
component_update |
UpdateComponentModel |
Stack component to use to update. |
required |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[zenml.models.component_model.ComponentModel, zenml.models.component_model.HydratedComponentModel] |
Updated stack component. |
Source code in zenml/zen_server/routers/stack_components_endpoints.py
@router.put(
"/{component_id}",
response_model=Union[ComponentModel, HydratedComponentModel], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_stack_component(
component_id: UUID,
component_update: UpdateComponentModel,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> Union[ComponentModel, HydratedComponentModel]:
"""Updates a stack component.
Args:
component_id: ID of the stack component.
component_update: Stack component to use to update.
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
Updated stack component.
"""
component_in_db = zen_store().get_stack_component(component_id)
updated_component = zen_store().update_stack_component(
component=component_update.apply_to_model(component_in_db)
)
if hydrated:
return updated_component.to_hydrated_model()
else:
return updated_component
stacks_endpoints
Endpoint definitions for stacks.
delete_stack(stack_id, _=Security(oauth2_password_bearer_authentication))
Deletes a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
UUID |
Name of the stack. |
required |
Source code in zenml/zen_server/routers/stacks_endpoints.py
@router.delete(
"/{stack_id}",
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_stack(
stack_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
"""Deletes a stack.
Args:
stack_id: Name of the stack.
"""
zen_store().delete_stack(stack_id) # aka 'deregister_stack'
get_stack(stack_id, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Returns the requested stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
UUID |
ID of the stack. |
required |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[zenml.models.stack_models.HydratedStackModel, zenml.models.stack_models.StackModel] |
The requested stack. |
Source code in zenml/zen_server/routers/stacks_endpoints.py
@router.get(
"/{stack_id}",
response_model=Union[HydratedStackModel, StackModel], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_stack(
stack_id: UUID,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[HydratedStackModel, StackModel]:
"""Returns the requested stack.
Args:
stack_id: ID of the stack.
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
The requested stack.
"""
stack = zen_store().get_stack(stack_id)
if hydrated:
return stack.to_hydrated_model()
else:
return stack
list_stacks(project_name_or_id=None, user_name_or_id=None, component_id=None, name=None, is_shared=None, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Returns all stacks.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project |
None |
user_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by name or ID of the user. |
None |
component_id |
Optional[uuid.UUID] |
Optionally filter by component that is part of the stack. |
None |
name |
Optional[str] |
Optionally filter by stack name |
None |
is_shared |
Optional[bool] |
Optionally filter by shared status of the stack |
None |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[List[zenml.models.stack_models.HydratedStackModel], List[zenml.models.stack_models.StackModel]] |
All stacks. |
Source code in zenml/zen_server/routers/stacks_endpoints.py
@router.get(
"",
response_model=Union[List[HydratedStackModel], List[StackModel]], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_stacks(
project_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
component_id: Optional[UUID] = None,
name: Optional[str] = None,
is_shared: Optional[bool] = None,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Union[List[HydratedStackModel], List[StackModel]]:
"""Returns all stacks.
Args:
project_name_or_id: Name or ID of the project
user_name_or_id: Optionally filter by name or ID of the user.
component_id: Optionally filter by component that is part of the stack.
name: Optionally filter by stack name
is_shared: Optionally filter by shared status of the stack
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
All stacks.
"""
return zen_store().list_stacks(
project_name_or_id=project_name_or_id,
user_name_or_id=user_name_or_id,
component_id=component_id,
is_shared=is_shared,
name=name,
hydrated=hydrated,
)
update_stack(stack_id, stack_update, hydrated=False, _=Security(oauth2_password_bearer_authentication))
Updates a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
UUID |
Name of the stack. |
required |
stack_update |
UpdateStackRequest |
Stack to use for the update. |
required |
hydrated |
bool |
Defines if stack components, users and projects will be included by reference (FALSE) or as model (TRUE) |
False |
Returns:
Type | Description |
---|---|
Union[zenml.models.stack_models.HydratedStackModel, zenml.models.stack_models.StackModel] |
The updated stack. |
Source code in zenml/zen_server/routers/stacks_endpoints.py
@router.put(
"/{stack_id}",
response_model=Union[HydratedStackModel, StackModel], # type: ignore[arg-type]
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_stack(
stack_id: UUID,
stack_update: UpdateStackRequest,
hydrated: bool = False,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> Union[HydratedStackModel, StackModel]:
"""Updates a stack.
Args:
stack_id: Name of the stack.
stack_update: Stack to use for the update.
hydrated: Defines if stack components, users and projects will be
included by reference (FALSE) or as model (TRUE)
Returns:
The updated stack.
"""
stack_in_db = zen_store().get_stack(stack_id)
updated_stack = zen_store().update_stack(
stack=stack_update.apply_to_model(stack_in_db)
)
if hydrated:
return updated_stack.to_hydrated_model()
else:
return updated_stack
steps_endpoints
Endpoint definitions for steps (and artifacts) of pipeline runs.
create_run_step(step, _=Security(oauth2_password_bearer_authentication))
Create a run step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
StepRunModel |
The run step to create. |
required |
Returns:
Type | Description |
---|---|
StepRunModel |
The created run step. |
Source code in zenml/zen_server/routers/steps_endpoints.py
@router.post(
"",
response_model=StepRunModel,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_run_step(
step: StepRunModel,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> StepRunModel:
"""Create a run step.
Args:
step: The run step to create.
Returns:
The created run step.
"""
return zen_store().create_run_step(step=step)
get_step(step_id, _=Security(oauth2_password_bearer_authentication))
Get one specific step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
UUID |
ID of the step to get. |
required |
Returns:
Type | Description |
---|---|
StepRunModel |
The step. |
Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
"/{step_id}",
response_model=StepRunModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step(
step_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> StepRunModel:
"""Get one specific step.
Args:
step_id: ID of the step to get.
Returns:
The step.
"""
return zen_store().get_run_step(step_id)
get_step_configuration(step_id, _=Security(oauth2_password_bearer_authentication))
Get the configuration of a specific step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
UUID |
ID of the step to get. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The step configuration. |
Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
"/{step_id}" + STEP_CONFIGURATION,
response_model=Dict[str, Any],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step_configuration(
step_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, Any]:
"""Get the configuration of a specific step.
Args:
step_id: ID of the step to get.
Returns:
The step configuration.
"""
return zen_store().get_run_step(step_id).step_configuration
get_step_inputs(step_id, _=Security(oauth2_password_bearer_authentication))
Get the inputs of a specific step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
UUID |
ID of the step for which to get the inputs. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.models.pipeline_models.ArtifactModel] |
All inputs of the step, mapping from input name to artifact model. |
Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
"/{step_id}" + INPUTS,
response_model=Dict[str, ArtifactModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step_inputs(
step_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, ArtifactModel]:
"""Get the inputs of a specific step.
Args:
step_id: ID of the step for which to get the inputs.
Returns:
All inputs of the step, mapping from input name to artifact model.
"""
return zen_store().get_run_step_inputs(step_id)
get_step_outputs(step_id, _=Security(oauth2_password_bearer_authentication))
Get the outputs of a specific step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
UUID |
ID of the step for which to get the outputs. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.models.pipeline_models.ArtifactModel] |
All outputs of the step, mapping from output name to artifact model. |
Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
"/{step_id}" + OUTPUTS,
response_model=Dict[str, ArtifactModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step_outputs(
step_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, ArtifactModel]:
"""Get the outputs of a specific step.
Args:
step_id: ID of the step for which to get the outputs.
Returns:
All outputs of the step, mapping from output name to artifact model.
"""
return {
artifact.name: artifact
for artifact in zen_store().list_artifacts(parent_step_id=step_id)
}
get_step_status(step_id, _=Security(oauth2_password_bearer_authentication))
Get the status of a specific step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
UUID |
ID of the step for which to get the status. |
required |
Returns:
Type | Description |
---|---|
ExecutionStatus |
The status of the step. |
Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
"/{step_id}" + STATUS,
response_model=ExecutionStatus,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step_status(
step_id: UUID,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> ExecutionStatus:
"""Get the status of a specific step.
Args:
step_id: ID of the step for which to get the status.
Returns:
The status of the step.
"""
return zen_store().get_run_step(step_id).status
list_run_steps(run_id=None, _=Security(oauth2_password_bearer_authentication))
Get run steps according to query filters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
Optional[uuid.UUID] |
The URI of the pipeline run by which to filter. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.pipeline_models.StepRunModel] |
The run steps according to query filters. |
Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
"",
response_model=List[StepRunModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_run_steps(
run_id: Optional[UUID] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[StepRunModel]:
"""Get run steps according to query filters.
Args:
run_id: The URI of the pipeline run by which to filter.
Returns:
The run steps according to query filters.
"""
return zen_store().list_run_steps(run_id=run_id)
update_step(step_id, step_model, _=Security(oauth2_password_bearer_authentication))
Updates a step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
UUID |
ID of the step. |
required |
step_model |
StepRunModel |
Step model to use for the update. |
required |
Returns:
Type | Description |
---|---|
StepRunModel |
The updated step model. |
Source code in zenml/zen_server/routers/steps_endpoints.py
@router.put(
"/{step_id}",
response_model=StepRunModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_step(
step_id: UUID,
step_model: StepRunModel,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> StepRunModel:
"""Updates a step.
Args:
step_id: ID of the step.
step_model: Step model to use for the update.
Returns:
The updated step model.
"""
step_model.id = step_id
updated_step = zen_store().update_run_step(step=step_model)
return updated_step
teams_endpoints
Endpoint definitions for teams and team membership.
create_team(team, _=Security(oauth2_password_bearer_authentication))
Creates a team.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team |
CreateTeamRequest |
Team to create. |
required |
Returns:
Type | Description |
---|---|
TeamModel |
The created team. |
Source code in zenml/zen_server/routers/teams_endpoints.py
@router.post(
"",
response_model=TeamModel,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_team(
team: CreateTeamRequest,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> TeamModel:
"""Creates a team.
# noqa: DAR401
Args:
team: Team to create.
Returns:
The created team.
"""
return zen_store().create_team(team=team.to_model())
delete_team(team_name_or_id, _=Security(oauth2_password_bearer_authentication))
Deletes a specific team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the team. |
required |
Source code in zenml/zen_server/routers/teams_endpoints.py
@router.delete(
"/{team_name_or_id}",
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_team(
team_name_or_id: Union[str, UUID],
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
"""Deletes a specific team.
Args:
team_name_or_id: Name or ID of the team.
"""
zen_store().delete_team(team_name_or_id=team_name_or_id)
get_role_assignments_for_team(team_name_or_id, project_name_or_id=None, _=Security(oauth2_password_bearer_authentication))
Returns a list of all roles that are assigned to a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the team. |
required |
project_name_or_id |
Union[str, uuid.UUID] |
If provided, only list roles that are limited to the given project. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.RoleAssignmentModel] |
A list of all roles that are assigned to a team. |
Source code in zenml/zen_server/routers/teams_endpoints.py
@router.get(
"/{team_name_or_id}" + ROLES,
response_model=List[RoleAssignmentModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_role_assignments_for_team(
team_name_or_id: Union[str, UUID],
project_name_or_id: Optional[Union[str, UUID]] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[RoleAssignmentModel]:
"""Returns a list of all roles that are assigned to a team.
Args:
team_name_or_id: Name or ID of the team.
project_name_or_id: If provided, only list roles that are limited to
the given project.
Returns:
A list of all roles that are assigned to a team.
"""
return zen_store().list_role_assignments(
team_name_or_id=team_name_or_id,
project_name_or_id=project_name_or_id,
)
get_team(team_name_or_id, _=Security(oauth2_password_bearer_authentication))
Returns a specific team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the team. |
required |
Returns:
Type | Description |
---|---|
TeamModel |
A specific team. |
Source code in zenml/zen_server/routers/teams_endpoints.py
@router.get(
"/{team_name_or_id}",
response_model=TeamModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_team(
team_name_or_id: Union[str, UUID],
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> TeamModel:
"""Returns a specific team.
Args:
team_name_or_id: Name or ID of the team.
Returns:
A specific team.
"""
return zen_store().get_team(team_name_or_id=team_name_or_id)
list_teams(_=Security(oauth2_password_bearer_authentication))
Returns a list of all teams.
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.TeamModel] |
List of all teams. |
Source code in zenml/zen_server/routers/teams_endpoints.py
@router.get(
"",
response_model=List[TeamModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_teams(
_: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> List[TeamModel]:
"""Returns a list of all teams.
Returns:
List of all teams.
"""
return zen_store().list_teams()
update_team(team_name_or_id, team_update, _=Security(oauth2_password_bearer_authentication))
Updates a team.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the team. |
required |
team_update |
UpdateTeamRequest |
Team update. |
required |
Returns:
Type | Description |
---|---|
TeamModel |
The created team. |
Source code in zenml/zen_server/routers/teams_endpoints.py
@router.put(
"/{team_name_or_id}",
response_model=TeamModel,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def update_team(
team_name_or_id: Union[str, UUID],
team_update: UpdateTeamRequest,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> TeamModel:
"""Updates a team.
# noqa: DAR401
Args:
team_name_or_id: Name or ID of the team.
team_update: Team update.
Returns:
The created team.
"""
team_in_db = zen_store().get_team(team_name_or_id)
return zen_store().update_team(team=team_update.apply_to_model(team_in_db))
users_endpoints
Endpoint definitions for users.
activate_user(user_name_or_id, user)
Activates a specific user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user. |
required |
user |
ActivateUserRequest |
the user to to use for the update. |
required |
Returns:
Type | Description |
---|---|
UserModel |
The updated user. |
Exceptions:
Type | Description |
---|---|
HTTPException |
If the user is not authorized to activate the user. |
Source code in zenml/zen_server/routers/users_endpoints.py
@activation_router.put(
"/{user_name_or_id}" + ACTIVATE,
response_model=UserModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def activate_user(
user_name_or_id: Union[str, UUID], user: ActivateUserRequest
) -> UserModel:
"""Activates a specific user.
Args:
user_name_or_id: Name or ID of the user.
user: the user to to use for the update.
Returns:
The updated user.
Raises:
HTTPException: If the user is not authorized to activate the user.
"""
auth_context = authenticate_credentials(
user_name_or_id=user_name_or_id,
activation_token=user.activation_token,
)
if auth_context is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
user_model = user.apply_to_model(auth_context.user)
user_model.active = True
user_model.activation_token = None
return zen_store().update_user(user_model)
assign_role(user_name_or_id, role_name_or_id, project_name_or_id=None, _=Security(oauth2_password_bearer_authentication))
Assign a role to a user for all resources within a given project or globally.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the role to assign to the user. |
required |
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user to which to assign the role. |
required |
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project in which to assign the role to the user. If this is not provided, the role will be assigned globally. |
None |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.post(
"/{user_name_or_id}" + ROLES,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def assign_role(
user_name_or_id: Union[str, UUID],
role_name_or_id: Union[str, UUID],
project_name_or_id: Optional[Union[str, UUID]] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
"""Assign a role to a user for all resources within a given project or globally.
Args:
role_name_or_id: The name or ID of the role to assign to the user.
user_name_or_id: Name or ID of the user to which to assign the role.
project_name_or_id: Name or ID of the project in which to assign the
role to the user. If this is not provided, the role will be
assigned globally.
"""
zen_store().assign_role(
role_name_or_id=role_name_or_id,
user_or_team_name_or_id=user_name_or_id,
is_user=True,
project_name_or_id=project_name_or_id,
)
create_user(user, _=Security(oauth2_password_bearer_authentication))
Creates a user.
noqa: DAR401
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
CreateUserRequest |
User to create. |
required |
Returns:
Type | Description |
---|---|
CreateUserResponse |
The created user. |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.post(
"",
response_model=CreateUserResponse,
responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_user(
user: CreateUserRequest,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> CreateUserResponse:
"""Creates a user.
# noqa: DAR401
Args:
user: User to create.
Returns:
The created user.
"""
# Two ways of creating a new user:
# 1. Create a new user with a password and have it immediately active
# 2. Create a new user without a password and have it activated at a
# later time with an activation token
user_model = user.to_model()
token: Optional[SecretStr] = None
if user.password is None:
user_model.active = False
token = user_model.generate_activation_token()
else:
user_model.active = True
new_user = zen_store().create_user(user_model)
# add back the original unhashed activation token, if generated, to
# send it back to the client
zen_store().assign_role(
role_name_or_id=zen_store()._admin_role.id,
user_or_team_name_or_id=new_user.id,
is_user=True,
)
new_user.activation_token = token
return CreateUserResponse.from_model(new_user)
deactivate_user(user_name_or_id, _=Security(oauth2_password_bearer_authentication))
Deactivates a user and generates a new activation token for it.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user. |
required |
Returns:
Type | Description |
---|---|
DeactivateUserResponse |
The generated activation token. |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.put(
"/{user_name_or_id}" + DEACTIVATE,
response_model=DeactivateUserResponse,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def deactivate_user(
user_name_or_id: Union[str, UUID],
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> DeactivateUserResponse:
"""Deactivates a user and generates a new activation token for it.
Args:
user_name_or_id: Name or ID of the user.
Returns:
The generated activation token.
"""
user = zen_store().get_user(user_name_or_id)
user.active = False
token = user.generate_activation_token()
user = zen_store().update_user(user=user)
# add back the original unhashed activation token
user.activation_token = token
return DeactivateUserResponse.from_model(user)
delete_user(user_name_or_id, auth_context=Security(oauth2_password_bearer_authentication))
Deletes a specific user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user. |
required |
auth_context |
AuthContext |
The authentication context. |
Security(oauth2_password_bearer_authentication) |
Exceptions:
Type | Description |
---|---|
IllegalOperationError |
If the user is not authorized to delete the user. |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.delete(
"/{user_name_or_id}",
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_user(
user_name_or_id: Union[str, UUID],
auth_context: AuthContext = Security(
authorize, scopes=[PermissionType.WRITE]
),
) -> None:
"""Deletes a specific user.
Args:
user_name_or_id: Name or ID of the user.
auth_context: The authentication context.
Raises:
IllegalOperationError: If the user is not authorized to delete the user.
"""
user = zen_store().get_user(user_name_or_id)
if auth_context.user.name == user.name:
raise IllegalOperationError(
"You cannot delete the user account currently used to authenticate "
"to the ZenML server. If you wish to delete this account, "
"please authenticate with another account or contact your ZenML "
"administrator."
)
zen_store().delete_user(user_name_or_id=user_name_or_id)
email_opt_in_response(user_name_or_id, user_response, auth_context=Security(oauth2_password_bearer_authentication))
Sets the response of the user to the email prompt.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user. |
required |
user_response |
EmailOptInModel |
User Response to email prompt |
required |
auth_context |
AuthContext |
The authentication context of the user |
Security(oauth2_password_bearer_authentication) |
Returns:
Type | Description |
---|---|
UserModel |
The updated user. |
Exceptions:
Type | Description |
---|---|
NotAuthorizedError |
if the user does not have the required permissions |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.put(
"/{user_name_or_id}" + EMAIL_ANALYTICS,
response_model=UserModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def email_opt_in_response(
user_name_or_id: Union[str, UUID],
user_response: EmailOptInModel,
auth_context: AuthContext = Security(authorize, scopes=[PermissionType.ME]),
) -> UserModel:
"""Sets the response of the user to the email prompt.
Args:
user_name_or_id: Name or ID of the user.
user_response: User Response to email prompt
auth_context: The authentication context of the user
Returns:
The updated user.
Raises:
NotAuthorizedError: if the user does not have the required
permissions
"""
if str(auth_context.user.id) == str(user_name_or_id):
return zen_store().user_email_opt_in(
user_name_or_id=user_name_or_id,
email=user_response.email,
user_opt_in_response=user_response.email_opted_in,
)
else:
raise NotAuthorizedError(
"Users can not opt in on behalf of another " "user."
)
get_current_user(auth_context=Security(oauth2_password_bearer_authentication))
Returns the model of the authenticated user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
auth_context |
AuthContext |
The authentication context. |
Security(oauth2_password_bearer_authentication) |
Returns:
Type | Description |
---|---|
UserModel |
The model of the authenticated user. |
Source code in zenml/zen_server/routers/users_endpoints.py
@current_user_router.get(
"/current-user",
response_model=UserModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_current_user(
auth_context: AuthContext = Security(
authorize, scopes=[PermissionType.READ]
),
) -> UserModel:
"""Returns the model of the authenticated user.
Args:
auth_context: The authentication context.
Returns:
The model of the authenticated user.
"""
return auth_context.user
get_role_assignments_for_user(user_name_or_id, project_name_or_id=None, role_name_or_id=None, _=Security(oauth2_password_bearer_authentication))
Returns a list of all roles that are assigned to a user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user. |
required |
project_name_or_id |
Union[str, uuid.UUID] |
If provided, only list roles that are limited to the given project. |
None |
role_name_or_id |
Union[str, uuid.UUID] |
If provided, only list assignments of the given role |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.RoleAssignmentModel] |
A list of all roles that are assigned to a user. |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.get(
"/{user_name_or_id}" + ROLES,
response_model=List[RoleAssignmentModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_role_assignments_for_user(
user_name_or_id: Union[str, UUID],
project_name_or_id: Optional[Union[str, UUID]] = None,
role_name_or_id: Optional[Union[str, UUID]] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[RoleAssignmentModel]:
"""Returns a list of all roles that are assigned to a user.
Args:
user_name_or_id: Name or ID of the user.
project_name_or_id: If provided, only list roles that are limited to
the given project.
role_name_or_id: If provided, only list assignments of the given
role
Returns:
A list of all roles that are assigned to a user.
"""
return zen_store().list_role_assignments(
user_name_or_id=user_name_or_id,
project_name_or_id=project_name_or_id,
role_name_or_id=role_name_or_id,
)
get_user(user_name_or_id, _=Security(oauth2_password_bearer_authentication))
Returns a specific user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user. |
required |
Returns:
Type | Description |
---|---|
UserModel |
A specific user. |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.get(
"/{user_name_or_id}",
response_model=UserModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_user(
user_name_or_id: Union[str, UUID],
_: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> UserModel:
"""Returns a specific user.
Args:
user_name_or_id: Name or ID of the user.
Returns:
A specific user.
"""
return zen_store().get_user(user_name_or_id=user_name_or_id)
list_users(_=Security(oauth2_password_bearer_authentication))
Returns a list of all users.
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.UserModel] |
A list of all users. |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.get(
"",
response_model=List[UserModel],
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_users(
_: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> List[UserModel]:
"""Returns a list of all users.
Returns:
A list of all users.
"""
return zen_store().list_users()
unassign_role(user_name_or_id, role_name_or_id, project_name_or_id=None, _=Security(oauth2_password_bearer_authentication))
Remove a users role within a project or globally.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user. |
required |
role_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the role. |
required |
project_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the project. If this is not provided, the role will be revoked globally. |
None |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.delete(
"/{user_name_or_id}" + ROLES + "/{role_name_or_id}",
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def unassign_role(
user_name_or_id: Union[str, UUID],
role_name_or_id: Union[str, UUID],
project_name_or_id: Optional[Union[str, UUID]] = None,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
"""Remove a users role within a project or globally.
Args:
user_name_or_id: Name or ID of the user.
role_name_or_id: Name or ID of the role.
project_name_or_id: Name or ID of the project. If this is not
provided, the role will be revoked globally.
"""
zen_store().revoke_role(
role_name_or_id=role_name_or_id,
user_or_team_name_or_id=user_name_or_id,
is_user=True,
project_name_or_id=project_name_or_id,
)
update_myself(user, auth_context=Security(oauth2_password_bearer_authentication))
Updates a specific user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
UpdateUserRequest |
the user to to use for the update. |
required |
auth_context |
AuthContext |
The authentication context. |
Security(oauth2_password_bearer_authentication) |
Returns:
Type | Description |
---|---|
UserModel |
The updated user. |
Source code in zenml/zen_server/routers/users_endpoints.py
@current_user_router.put(
"/current-user",
response_model=UserModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_myself(
user: UpdateUserRequest,
auth_context: AuthContext = Security(authorize, scopes=[PermissionType.ME]),
) -> UserModel:
"""Updates a specific user.
Args:
user: the user to to use for the update.
auth_context: The authentication context.
Returns:
The updated user.
"""
user_model = user.apply_to_model(auth_context.user)
return zen_store().update_user(user_model)
update_user(user_name_or_id, user, _=Security(oauth2_password_bearer_authentication))
Updates a specific user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user. |
required |
user |
UpdateUserRequest |
the user to to use for the update. |
required |
Returns:
Type | Description |
---|---|
UserModel |
The updated user. |
Source code in zenml/zen_server/routers/users_endpoints.py
@router.put(
"/{user_name_or_id}",
response_model=UserModel,
responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_user(
user_name_or_id: Union[str, UUID],
user: UpdateUserRequest,
_: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> UserModel:
"""Updates a specific user.
Args:
user_name_or_id: Name or ID of the user.
user: the user to to use for the update.
Returns:
The updated user.
"""
existing_user = zen_store().get_user(user_name_or_id)
user_model = user.apply_to_model(existing_user)
return zen_store().update_user(user_model)
utils
Util functions for the ZenML Server.
ErrorModel (BaseModel)
pydantic-model
Base class for error responses.
Source code in zenml/zen_server/utils.py
class ErrorModel(BaseModel):
"""Base class for error responses."""
detail: Any
conflict(error)
Convert an Exception to a HTTP 409 response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception |
Exception to convert. |
required |
Returns:
Type | Description |
---|---|
HTTPException |
HTTPException with status code 409. |
Source code in zenml/zen_server/utils.py
def conflict(error: Exception) -> HTTPException:
"""Convert an Exception to a HTTP 409 response.
Args:
error: Exception to convert.
Returns:
HTTPException with status code 409.
"""
return HTTPException(status_code=409, detail=error_detail(error))
error_detail(error)
Convert an Exception to API representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception |
Exception to convert. |
required |
Returns:
Type | Description |
---|---|
List[str] |
List of strings representing the error. |
Source code in zenml/zen_server/utils.py
def error_detail(error: Exception) -> List[str]:
"""Convert an Exception to API representation.
Args:
error: Exception to convert.
Returns:
List of strings representing the error.
"""
return [type(error).__name__] + [str(a) for a in error.args]
forbidden(error)
Convert an Exception to a HTTP 403 response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception |
Exception to convert. |
required |
Returns:
Type | Description |
---|---|
HTTPException |
HTTPException with status code 403. |
Source code in zenml/zen_server/utils.py
def forbidden(error: Exception) -> HTTPException:
"""Convert an Exception to a HTTP 403 response.
Args:
error: Exception to convert.
Returns:
HTTPException with status code 403.
"""
return HTTPException(status_code=403, detail=error_detail(error))
handle_exceptions(func)
Decorator to handle exceptions in the API.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func |
~F |
Function to decorate. |
required |
Returns:
Type | Description |
---|---|
~F |
Decorated function. |
Source code in zenml/zen_server/utils.py
def handle_exceptions(func: F) -> F:
"""Decorator to handle exceptions in the API.
Args:
func: Function to decorate.
Returns:
Decorated function.
"""
@wraps(func)
def decorated(*args: Any, **kwargs: Any) -> Any:
try:
return func(*args, **kwargs)
except NotAuthorizedError as error:
logger.exception("Authorization error")
raise not_authorized(error) from error
except KeyError as error:
logger.exception("Entity not found")
raise not_found(error) from error
except (
StackExistsError,
StackComponentExistsError,
EntityExistsError,
) as error:
logger.exception("Entity already exists")
raise conflict(error) from error
except IllegalOperationError as error:
logger.exception("Illegal operation")
raise forbidden(error) from error
except ValueError as error:
logger.exception("Validation error")
raise unprocessable(error) from error
return cast(F, decorated)
initialize_zen_store()
Initialize the ZenML Store.
Exceptions:
Type | Description |
---|---|
ValueError |
If the ZenML Store is using a REST back-end. |
Source code in zenml/zen_server/utils.py
def initialize_zen_store() -> None:
"""Initialize the ZenML Store.
Raises:
ValueError: If the ZenML Store is using a REST back-end.
"""
global _zen_store
logger.debug("Initializing ZenML Store for FastAPI...")
_zen_store = GlobalConfiguration().zen_store
# We override track_analytics=False because we do not
# want to track anything server side.
_zen_store.track_analytics = False
if _zen_store.type == StoreType.REST:
raise ValueError(
"Server cannot be started with a REST store type. Make sure you "
"configure ZenML to use a non-networked store backend "
"when trying to start the ZenML Server."
)
not_authorized(error)
Convert an Exception to a HTTP 401 response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception |
Exception to convert. |
required |
Returns:
Type | Description |
---|---|
HTTPException |
HTTPException with status code 401. |
Source code in zenml/zen_server/utils.py
def not_authorized(error: Exception) -> HTTPException:
"""Convert an Exception to a HTTP 401 response.
Args:
error: Exception to convert.
Returns:
HTTPException with status code 401.
"""
return HTTPException(status_code=401, detail=error_detail(error))
not_found(error)
Convert an Exception to a HTTP 404 response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception |
Exception to convert. |
required |
Returns:
Type | Description |
---|---|
HTTPException |
HTTPException with status code 404. |
Source code in zenml/zen_server/utils.py
def not_found(error: Exception) -> HTTPException:
"""Convert an Exception to a HTTP 404 response.
Args:
error: Exception to convert.
Returns:
HTTPException with status code 404.
"""
return HTTPException(status_code=404, detail=error_detail(error))
unprocessable(error)
Convert an Exception to a HTTP 409 response.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
error |
Exception |
Exception to convert. |
required |
Returns:
Type | Description |
---|---|
HTTPException |
HTTPException with status code 422. |
Source code in zenml/zen_server/utils.py
def unprocessable(error: Exception) -> HTTPException:
"""Convert an Exception to a HTTP 409 response.
Args:
error: Exception to convert.
Returns:
HTTPException with status code 422.
"""
return HTTPException(status_code=422, detail=error_detail(error))
zen_store()
Initialize the ZenML Store.
Returns:
Type | Description |
---|---|
BaseZenStore |
The ZenML Store. |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the ZenML Store has not been initialized. |
Source code in zenml/zen_server/utils.py
def zen_store() -> BaseZenStore:
"""Initialize the ZenML Store.
Returns:
The ZenML Store.
Raises:
RuntimeError: If the ZenML Store has not been initialized.
"""
global _zen_store
if _zen_store is None:
raise RuntimeError("ZenML Store not initialized")
return _zen_store
zen_server_api
Zen Server API.
catch_all(request, file_path)
Dashboard endpoint.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
Request |
Request object. |
required |
file_path |
str |
Path to a file in the dashboard root folder. |
required |
Returns:
Type | Description |
---|---|
Any |
The ZenML dashboard. |
Exceptions:
Type | Description |
---|---|
HTTPException |
404 error if requested a non-existent static file or if the dashboard files are not included. |
Source code in zenml/zen_server/zen_server_api.py
@app.get("/{file_path:path}", include_in_schema=False)
def catch_all(request: Request, file_path: str) -> Any:
"""Dashboard endpoint.
Args:
request: Request object.
file_path: Path to a file in the dashboard root folder.
Returns:
The ZenML dashboard.
Raises:
HTTPException: 404 error if requested a non-existent static file or if
the dashboard files are not included.
"""
# some static files need to be served directly from the root dashboard
# directory
if file_path and file_path in root_static_files:
logger.debug(f"Returning static file: {file_path}")
full_path = os.path.join(relative_path(DASHBOARD_DIRECTORY), file_path)
return FileResponse(full_path)
tokens = file_path.split("/")
if len(tokens) == 1 and not request.query_params:
logger.debug(f"Requested non-existent static file: {file_path}")
raise HTTPException(status_code=404)
if not os.path.isfile(
os.path.join(relative_path(DASHBOARD_DIRECTORY), "index.html")
):
raise HTTPException(status_code=404)
# everything else is directed to the index.html file that hosts the
# single-page application
return templates.TemplateResponse("index.html", {"request": request})
dashboard(request)
Dashboard endpoint.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
Request |
Request object. |
required |
Returns:
Type | Description |
---|---|
Any |
The ZenML dashboard. |
Exceptions:
Type | Description |
---|---|
HTTPException |
If the dashboard files are not included. |
Source code in zenml/zen_server/zen_server_api.py
@app.get("/", include_in_schema=False)
def dashboard(request: Request) -> Any:
"""Dashboard endpoint.
Args:
request: Request object.
Returns:
The ZenML dashboard.
Raises:
HTTPException: If the dashboard files are not included.
"""
if not os.path.isfile(
os.path.join(relative_path(DASHBOARD_DIRECTORY), "index.html")
):
raise HTTPException(status_code=404)
return templates.TemplateResponse("index.html", {"request": request})
get_root_static_files()
Get the list of static files in the root dashboard directory.
These files are static files that are not in the /static subdirectory that need to be served as static files under the root URL path.
Returns:
Type | Description |
---|---|
List[str] |
List of static files in the root directory. |
Source code in zenml/zen_server/zen_server_api.py
def get_root_static_files() -> List[str]:
"""Get the list of static files in the root dashboard directory.
These files are static files that are not in the /static subdirectory
that need to be served as static files under the root URL path.
Returns:
List of static files in the root directory.
"""
root_path = relative_path(DASHBOARD_DIRECTORY)
if not os.path.isdir(root_path):
return []
files = []
for file in os.listdir(root_path):
if file == "index.html":
# this is served separately
continue
if isfile(os.path.join(root_path, file)):
files.append(file)
return files
health()
Get health status of the server.
Returns:
Type | Description |
---|---|
str |
String representing the health status of the server. |
Source code in zenml/zen_server/zen_server_api.py
@app.head(HEALTH, include_in_schema=False)
@app.get(HEALTH)
def health() -> str:
"""Get health status of the server.
Returns:
String representing the health status of the server.
"""
return "OK"
initialize()
Initialize the ZenML server.
Source code in zenml/zen_server/zen_server_api.py
@app.on_event("startup")
def initialize() -> None:
"""Initialize the ZenML server."""
# IMPORTANT: this needs to be done before the fastapi app starts, to avoid
# race conditions
initialize_zen_store()
sync_pipeline_runs_on_schedule()
invalid_api(invalid_api_path)
Invalid API endpoint.
All API endpoints that are not defined in the API routers will be redirected to this endpoint and will return a 404 error.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
invalid_api_path |
str |
Invalid API path. |
required |
Exceptions:
Type | Description |
---|---|
HTTPException |
404 error. |
Source code in zenml/zen_server/zen_server_api.py
@app.get(
API + "/{invalid_api_path:path}", status_code=404, include_in_schema=False
)
def invalid_api(invalid_api_path: str) -> None:
"""Invalid API endpoint.
All API endpoints that are not defined in the API routers will be
redirected to this endpoint and will return a 404 error.
Args:
invalid_api_path: Invalid API path.
Raises:
HTTPException: 404 error.
"""
logger.debug(f"Invalid API path requested: {invalid_api_path}")
raise HTTPException(status_code=404)
relative_path(rel)
Get the absolute path of a path relative to the ZenML server module.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
rel |
str |
Relative path. |
required |
Returns:
Type | Description |
---|---|
str |
Absolute path. |
Source code in zenml/zen_server/zen_server_api.py
def relative_path(rel: str) -> str:
"""Get the absolute path of a path relative to the ZenML server module.
Args:
rel: Relative path.
Returns:
Absolute path.
"""
return os.path.join(os.path.dirname(__file__), rel)
sync_pipeline_runs_on_schedule()
Sync pipeline runs.
Source code in zenml/zen_server/zen_server_api.py
@app.on_event("startup")
@repeat_every(
seconds=float(os.getenv(ENV_ZENML_SERVER_METADATA_SYNC_PERIOD, 30)),
wait_first=True,
)
def sync_pipeline_runs_on_schedule() -> None:
"""Sync pipeline runs."""
logger.info("Syncing pipeline runs in server schedule...")
try:
zen_store()._sync_runs()
except Exception:
logger.exception("Failed to sync pipeline runs.")