Zen Service
zenml.zen_service
special
Zen Service
The Zen Service is a simple webserver to let you collaborate on stacks via
the network. It can be spun up in a background daemon from the command line
using zenml service up
and managed from the same command line group.
Using the Zen Service's stacks in your project just requires setting up a
profile with rest
store-type pointed to the url of the service.
zen_service
ZenService (LocalDaemonService)
pydantic-model
ZenService daemon that can be used to start a local ZenService server.
Attributes:
Name | Type | Description |
---|---|---|
config |
ZenServiceConfig |
service configuration |
endpoint |
ZenServiceEndpoint |
optional service endpoint |
Source code in zenml/zen_service/zen_service.py
class ZenService(LocalDaemonService):
"""ZenService daemon that can be used to start a local ZenService server.
Attributes:
config: service configuration
endpoint: optional service endpoint
"""
SERVICE_TYPE = ServiceType(
name="zen_service",
type="zenml",
flavor="zenml",
description="ZenService to manage stacks, users and pipelines",
)
config: ZenServiceConfig
endpoint: ZenServiceEndpoint
def __init__(
self,
config: Union[ZenServiceConfig, Dict[str, Any]],
**attrs: Any,
) -> None:
# ensure that the endpoint is created before the service is initialized
if isinstance(config, ZenServiceConfig) and "endpoint" not in attrs:
endpoint_uri_path = ZEN_SERVICE_URL_PATH
healthcheck_uri_path = ZEN_SERVICE_HEALTHCHECK_URL_PATH
use_head_request = True
endpoint = ZenServiceEndpoint(
config=ZenServiceEndpointConfig(
protocol=ServiceEndpointProtocol.HTTP,
port=config.port,
zen_service_uri_path=endpoint_uri_path,
),
monitor=HTTPEndpointHealthMonitor(
config=HTTPEndpointHealthMonitorConfig(
healthcheck_uri_path=healthcheck_uri_path,
use_head_request=use_head_request,
)
),
)
attrs["endpoint"] = endpoint
super().__init__(config=config, **attrs)
def run(self) -> None:
if self.config.store_profile_configuration.store_type == StoreType.REST:
raise ValueError(
"Service cannot be started with REST store type. Make sure you "
"specify a profile with a non-networked persistence backend "
"when trying to start the Zen Service. (use command line flag "
"`--profile=$PROFILE_NAME` or set the env variable "
f"{ENV_ZENML_PROFILE_NAME} to specify the use of a profile "
"other than the currently active one)"
)
logger.info(
"Starting ZenService as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
# this is the only way to pass information into the FastAPI app??
os.environ[
"ZENML_PROFILE_CONFIGURATION"
] = self.config.store_profile_configuration.json()
try:
uvicorn.run(
ZEN_SERVICE_ENTRYPOINT,
host=ZEN_SERVICE_IP,
port=self.endpoint.status.port,
log_level="info",
)
except KeyboardInterrupt:
logger.info("Zen service stopped. Resuming normal execution.")
@property
def zen_service_uri(self) -> Optional[str]:
"""Get the URI where the service is running.
Returns:
The URI where the service can be contacted for requests,
or None, if the service isn't running.
"""
if not self.is_running:
return None
return self.endpoint.endpoint_uri
zen_service_uri: Optional[str]
property
readonly
Get the URI where the service is running.
Returns:
Type | Description |
---|---|
Optional[str] |
The URI where the service can be contacted for requests, or None, if the service isn't running. |
run(self)
Run the service daemon process associated with this service.
Subclasses must implement this method to provide the service daemon
functionality. This method will be executed in the context of the
running daemon, not in the context of the process that calls the
start
method.
Source code in zenml/zen_service/zen_service.py
def run(self) -> None:
if self.config.store_profile_configuration.store_type == StoreType.REST:
raise ValueError(
"Service cannot be started with REST store type. Make sure you "
"specify a profile with a non-networked persistence backend "
"when trying to start the Zen Service. (use command line flag "
"`--profile=$PROFILE_NAME` or set the env variable "
f"{ENV_ZENML_PROFILE_NAME} to specify the use of a profile "
"other than the currently active one)"
)
logger.info(
"Starting ZenService as blocking "
"process... press CTRL+C once to stop it."
)
self.endpoint.prepare_for_start()
# this is the only way to pass information into the FastAPI app??
os.environ[
"ZENML_PROFILE_CONFIGURATION"
] = self.config.store_profile_configuration.json()
try:
uvicorn.run(
ZEN_SERVICE_ENTRYPOINT,
host=ZEN_SERVICE_IP,
port=self.endpoint.status.port,
log_level="info",
)
except KeyboardInterrupt:
logger.info("Zen service stopped. Resuming normal execution.")
ZenServiceConfig (LocalDaemonServiceConfig)
pydantic-model
Zen Service deployment configuration.
Attributes:
Name | Type | Description |
---|---|---|
port |
int |
Port at which the service is running |
store_profile_configuration |
ProfileConfiguration |
ProfileConfiguration describing where the service should persist its data. |
Source code in zenml/zen_service/zen_service.py
class ZenServiceConfig(LocalDaemonServiceConfig):
"""Zen Service deployment configuration.
Attributes:
port: Port at which the service is running
store_profile_configuration: ProfileConfiguration describing where
the service should persist its data.
"""
port: int = 8000
store_profile_configuration: ProfileConfiguration = Field(
default_factory=lambda: Repository().active_profile
)
ZenServiceEndpoint (LocalDaemonServiceEndpoint)
pydantic-model
A service endpoint exposed by the Zen service daemon.
Attributes:
Name | Type | Description |
---|---|---|
config |
ZenServiceEndpointConfig |
service endpoint configuration |
monitor |
HTTPEndpointHealthMonitor |
optional service endpoint health monitor |
Source code in zenml/zen_service/zen_service.py
class ZenServiceEndpoint(LocalDaemonServiceEndpoint):
"""A service endpoint exposed by the Zen service daemon.
Attributes:
config: service endpoint configuration
monitor: optional service endpoint health monitor
"""
config: ZenServiceEndpointConfig
monitor: HTTPEndpointHealthMonitor
@property
def endpoint_uri(self) -> Optional[str]:
uri = self.status.uri
if not uri:
return None
return f"{uri}{self.config.zen_service_uri_path}"
ZenServiceEndpointConfig (LocalDaemonServiceEndpointConfig)
pydantic-model
Zen Service endpoint configuration.
Attributes:
Name | Type | Description |
---|---|---|
zen_service_uri_path |
str |
URI path for the zenml service |
Source code in zenml/zen_service/zen_service.py
class ZenServiceEndpointConfig(LocalDaemonServiceEndpointConfig):
"""Zen Service endpoint configuration.
Attributes:
zen_service_uri_path: URI path for the zenml service
"""
zen_service_uri_path: str
zen_service_api
add_user_to_team(name, user)
async
Adds a user to a team.
Source code in zenml/zen_service/zen_service_api.py
@authed.post(TEAMS + "/{name}/users", responses={404: error_response})
async def add_user_to_team(name: str, user: User) -> None:
"""Adds a user to a team."""
try:
zen_store.add_user_to_team(team_name=name, user_name=user.name)
except KeyError as error:
raise not_found(error) from error
assign_role(data)
async
Assigns a role.
Source code in zenml/zen_service/zen_service_api.py
@authed.post(
ROLE_ASSIGNMENTS,
responses={404: error_response},
)
async def assign_role(data: Dict[str, Any]) -> None:
"""Assigns a role."""
role_name = data["role_name"]
entity_name = data["entity_name"]
project_name = data.get("project_name")
is_user = data.get("is_user", True)
try:
zen_store.assign_role(
role_name=role_name,
entity_name=entity_name,
project_name=project_name,
is_user=is_user,
)
except KeyError as error:
raise not_found(error) from error
authorize(credentials=Depends(HTTPBasic))
Authorizes any request to the ZenService.
Right now this method only checks if the username provided as part of http basic auth credentials is registered in the ZenStore.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
credentials |
HTTPBasicCredentials |
HTTP basic auth credentials passed to the request. |
Depends(HTTPBasic) |
Source code in zenml/zen_service/zen_service_api.py
def authorize(credentials: HTTPBasicCredentials = Depends(security)) -> None:
"""Authorizes any request to the ZenService.
Right now this method only checks if the username provided as part of http
basic auth credentials is registered in the ZenStore.
Args:
credentials: HTTP basic auth credentials passed to the request.
"""
try:
zen_store.get_user(credentials.username)
except KeyError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username.",
)
conflict(error)
Convert an Exception to a HTTP 409 response.
Source code in zenml/zen_service/zen_service_api.py
def conflict(error: Exception) -> HTTPException:
"""Convert an Exception to a HTTP 409 response."""
return HTTPException(status_code=409, detail=error_detail(error))
create_project(project)
async
Creates a project.
Source code in zenml/zen_service/zen_service_api.py
@authed.post(
PROJECTS,
response_model=Project,
responses={409: error_response},
)
async def create_project(project: Project) -> Project:
"""Creates a project."""
try:
return zen_store.create_project(
project_name=project.name, description=project.description
)
except EntityExistsError as error:
raise conflict(error) from error
create_role(role)
async
Creates a role.
Source code in zenml/zen_service/zen_service_api.py
@authed.post(
ROLES,
response_model=Role,
responses={409: error_response},
)
async def create_role(role: Role) -> Role:
"""Creates a role."""
try:
return zen_store.create_role(role.name)
except EntityExistsError as error:
raise conflict(error) from error
create_team(team)
async
Creates a team.
Source code in zenml/zen_service/zen_service_api.py
@authed.post(
TEAMS,
response_model=Team,
responses={409: error_response},
)
async def create_team(team: Team) -> Team:
"""Creates a team."""
try:
return zen_store.create_team(team.name)
except EntityExistsError as error:
raise conflict(error) from error
create_user(user)
async
Creates a user.
Source code in zenml/zen_service/zen_service_api.py
@authed.post(
USERS,
response_model=User,
responses={409: error_response},
)
async def create_user(user: User) -> User:
"""Creates a user."""
try:
return zen_store.create_user(user.name)
except EntityExistsError as error:
raise conflict(error) from error
delete_project(name)
async
Deletes a project.
Source code in zenml/zen_service/zen_service_api.py
@authed.delete(PROJECTS + "/{name}", responses={404: error_response})
async def delete_project(name: str) -> None:
"""Deletes a project."""
try:
zen_store.delete_project(project_name=name)
except KeyError as error:
raise not_found(error) from error
delete_role(name)
async
Deletes a role.
Source code in zenml/zen_service/zen_service_api.py
@authed.delete(ROLES + "/{name}", responses={404: error_response})
async def delete_role(name: str) -> None:
"""Deletes a role."""
try:
zen_store.delete_role(role_name=name)
except KeyError as error:
raise not_found(error) from error
delete_team(name)
async
Deletes a team.
Source code in zenml/zen_service/zen_service_api.py
@authed.delete(TEAMS + "/{name}", responses={404: error_response})
async def delete_team(name: str) -> None:
"""Deletes a team."""
try:
zen_store.delete_team(team_name=name)
except KeyError as error:
raise not_found(error) from error
delete_user(name)
async
Deletes a user.
Source code in zenml/zen_service/zen_service_api.py
@authed.delete(USERS + "/{name}", responses={404: error_response})
async def delete_user(name: str) -> None:
"""Deletes a user."""
try:
zen_store.delete_user(user_name=name)
except KeyError as error:
raise not_found(error) from error
deregister_stack(name)
async
Deregisters a stack.
Source code in zenml/zen_service/zen_service_api.py
@authed.delete(STACKS + "/{name}", responses={404: error_response})
async def deregister_stack(name: str) -> None:
"""Deregisters a stack."""
try:
zen_store.deregister_stack(name)
except KeyError as error:
raise not_found(error) from error
deregister_stack_component(component_type, name)
async
Deregisters a stack component.
Source code in zenml/zen_service/zen_service_api.py
@authed.delete(
STACK_COMPONENTS + "/{component_type}/{name}",
responses={404: error_response, 409: error_response},
)
async def deregister_stack_component(
component_type: StackComponentType, name: str
) -> None:
"""Deregisters a stack component."""
try:
return zen_store.deregister_stack_component(component_type, name=name)
except KeyError as error:
raise not_found(error) from error
except ValueError as error:
raise conflict(error) from error
error_detail(error)
Convert an Exception to API representation.
Source code in zenml/zen_service/zen_service_api.py
def error_detail(error: Exception) -> List[str]:
"""Convert an Exception to API representation."""
return [type(error).__name__] + [str(a) for a in error.args]
get_project(name)
async
Gets a specific project.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(PROJECTS + "/{name}", responses={404: error_response})
async def get_project(name: str) -> Project:
"""Gets a specific project."""
try:
return zen_store.get_project(project_name=name)
except KeyError as error:
raise not_found(error) from error
get_role(name)
async
Gets a specific role.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(ROLES + "/{name}", responses={404: error_response})
async def get_role(name: str) -> Role:
"""Gets a specific role."""
try:
return zen_store.get_role(role_name=name)
except KeyError as error:
raise not_found(error) from error
get_stack(name)
async
Returns the requested stack.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(
STACKS + "/{name}",
response_model=StackWrapper,
responses={404: error_response},
)
async def get_stack(name: str) -> StackWrapper:
"""Returns the requested stack."""
try:
return zen_store.get_stack(name)
except KeyError as error:
raise not_found(error) from error
get_stack_component(component_type, name)
async
Returns the requested stack component.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(
STACK_COMPONENTS + "/{component_type}/{name}",
response_model=StackComponentWrapper,
responses={404: error_response},
)
async def get_stack_component(
component_type: StackComponentType, name: str
) -> StackComponentWrapper:
"""Returns the requested stack component."""
try:
return zen_store.get_stack_component(component_type, name=name)
except KeyError as error:
raise not_found(error) from error
get_stack_components(component_type)
async
Returns all stack components for the requested type.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(
STACK_COMPONENTS + "/{component_type}",
response_model=List[StackComponentWrapper],
)
async def get_stack_components(
component_type: StackComponentType,
) -> List[StackComponentWrapper]:
"""Returns all stack components for the requested type."""
return zen_store.get_stack_components(component_type)
get_stack_configuration(name)
async
Returns the configuration for the requested stack.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(
STACK_CONFIGURATIONS + "/{name}",
response_model=Dict[StackComponentType, str],
responses={404: error_response},
)
async def get_stack_configuration(name: str) -> Dict[StackComponentType, str]:
"""Returns the configuration for the requested stack."""
try:
return zen_store.get_stack_configuration(name)
except KeyError as error:
raise not_found(error) from error
get_team(name)
async
Gets a specific team.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(TEAMS + "/{name}", responses={404: error_response})
async def get_team(name: str) -> Team:
"""Gets a specific team."""
try:
return zen_store.get_team(team_name=name)
except KeyError as error:
raise not_found(error) from error
get_user(name)
async
Gets a specific user.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(USERS + "/{name}", responses={404: error_response})
async def get_user(name: str) -> User:
"""Gets a specific user."""
try:
return zen_store.get_user(user_name=name)
except KeyError as error:
raise not_found(error) from error
is_empty()
async
Returns whether stacks are registered or not.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(IS_EMPTY, response_model=bool)
async def is_empty() -> bool:
"""Returns whether stacks are registered or not."""
return zen_store.is_empty
not_found(error)
Convert an Exception to a HTTP 404 response.
Source code in zenml/zen_service/zen_service_api.py
def not_found(error: Exception) -> HTTPException:
"""Convert an Exception to a HTTP 404 response."""
return HTTPException(status_code=404, detail=error_detail(error))
projects()
async
Returns all projects.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(PROJECTS, response_model=List[Project])
async def projects() -> List[Project]:
"""Returns all projects."""
return zen_store.projects
register_stack(stack)
async
Registers a stack.
Source code in zenml/zen_service/zen_service_api.py
@authed.post(
STACKS,
response_model=Dict[str, str],
responses={409: error_response},
)
async def register_stack(stack: StackWrapper) -> Dict[str, str]:
"""Registers a stack."""
try:
return zen_store.register_stack(stack)
except (StackExistsError, StackComponentExistsError) as error:
raise conflict(error) from error
register_stack_component(component)
async
Registers a stack component.
Source code in zenml/zen_service/zen_service_api.py
@authed.post(STACK_COMPONENTS, responses={409: error_response})
async def register_stack_component(
component: StackComponentWrapper,
) -> None:
"""Registers a stack component."""
try:
zen_store.register_stack_component(component)
except StackComponentExistsError as error:
raise conflict(error) from error
remove_user_from_team(team_name, user_name)
async
Removes a user from a team.
Source code in zenml/zen_service/zen_service_api.py
@authed.delete(
TEAMS + "/{team_name}/users/{user_name}", responses={404: error_response}
)
async def remove_user_from_team(team_name: str, user_name: str) -> None:
"""Removes a user from a team."""
try:
zen_store.remove_user_from_team(
team_name=team_name, user_name=user_name
)
except KeyError as error:
raise not_found(error) from error
revoke_role(data)
async
Revokes a role.
Source code in zenml/zen_service/zen_service_api.py
@authed.delete(ROLE_ASSIGNMENTS, responses={404: error_response})
async def revoke_role(data: Dict[str, Any]) -> None:
"""Revokes a role."""
role_name = data["role_name"]
entity_name = data["entity_name"]
project_name = data.get("project_name")
is_user = data.get("is_user", True)
try:
zen_store.revoke_role(
role_name=role_name,
entity_name=entity_name,
project_name=project_name,
is_user=is_user,
)
except KeyError as error:
raise not_found(error) from error
role_assignments()
async
Returns all role assignments.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(ROLE_ASSIGNMENTS, response_model=List[RoleAssignment])
async def role_assignments() -> List[RoleAssignment]:
"""Returns all role assignments."""
return zen_store.role_assignments
role_assignments_for_team(name, project_name=None)
async
Gets all role assignments for a team.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(
TEAMS + "/{name}/role_assignments",
response_model=List[RoleAssignment],
responses={404: error_response},
)
async def role_assignments_for_team(
name: str, project_name: Optional[str] = None
) -> List[RoleAssignment]:
"""Gets all role assignments for a team."""
try:
return zen_store.get_role_assignments_for_team(
team_name=name, project_name=project_name
)
except KeyError as error:
raise not_found(error) from error
role_assignments_for_user(name, project_name=None)
async
Returns all role assignments for a user.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(
USERS + "/{name}/role_assignments",
response_model=List[RoleAssignment],
responses={404: error_response},
)
async def role_assignments_for_user(
name: str, project_name: Optional[str] = None
) -> List[RoleAssignment]:
"""Returns all role assignments for a user."""
try:
return zen_store.get_role_assignments_for_user(
user_name=name, project_name=project_name, include_team_roles=False
)
except KeyError as error:
raise not_found(error) from error
roles()
async
Returns all roles.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(ROLES, response_model=List[Role])
async def roles() -> List[Role]:
"""Returns all roles."""
return zen_store.roles
service_info()
async
Returns the profile configuration for this service.
Source code in zenml/zen_service/zen_service_api.py
@authed.get("/", response_model=ProfileConfiguration)
async def service_info() -> ProfileConfiguration:
"""Returns the profile configuration for this service."""
return profile
stack_configurations()
async
Returns configurations for all stacks.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(
STACK_CONFIGURATIONS,
response_model=Dict[str, Dict[StackComponentType, str]],
)
async def stack_configurations() -> Dict[str, Dict[StackComponentType, str]]:
"""Returns configurations for all stacks."""
return zen_store.stack_configurations
stacks()
async
Returns all stacks.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(STACKS, response_model=List[StackWrapper])
async def stacks() -> List[StackWrapper]:
"""Returns all stacks."""
return zen_store.stacks
teams()
async
Returns all teams.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(TEAMS, response_model=List[Team])
async def teams() -> List[Team]:
"""Returns all teams."""
return zen_store.teams
teams_for_user(name)
async
Returns all teams for a user.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(
USERS + "/{name}/teams",
response_model=List[Team],
responses={404: error_response},
)
async def teams_for_user(name: str) -> List[Team]:
"""Returns all teams for a user."""
try:
return zen_store.get_teams_for_user(user_name=name)
except KeyError as error:
raise not_found(error) from error
update_stack(stack, name)
async
Updates a stack.
Source code in zenml/zen_service/zen_service_api.py
@authed.put(
STACKS + "/{name}",
response_model=Dict[str, str],
responses={404: error_response},
)
async def update_stack(stack: StackWrapper, name: str) -> Dict[str, str]:
"""Updates a stack."""
try:
return zen_store.update_stack(name, stack)
except DoesNotExistException as error:
raise not_found(error) from error
update_stack_component(name, component_type, component)
async
Updates a stack component.
Source code in zenml/zen_service/zen_service_api.py
@authed.put(
STACK_COMPONENTS + "/{component_type}/{name}",
response_model=Dict[str, str],
responses={404: error_response},
)
async def update_stack_component(
name: str,
component_type: StackComponentType,
component: StackComponentWrapper,
) -> Dict[str, str]:
"""Updates a stack component."""
try:
return zen_store.update_stack_component(name, component_type, component)
except KeyError as error:
raise not_found(error) from error
users()
async
Returns all users.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(USERS, response_model=List[User])
async def users() -> List[User]:
"""Returns all users."""
return zen_store.users
users_for_team(name)
async
Returns all users for a team.
Source code in zenml/zen_service/zen_service_api.py
@authed.get(
TEAMS + "/{name}/users",
response_model=List[User],
responses={404: error_response},
)
async def users_for_team(name: str) -> List[User]:
"""Returns all users for a team."""
try:
return zen_store.get_users_for_team(team_name=name)
except KeyError as error:
raise not_found(error) from error