Skip to content

Zen Service

zenml.zen_service special

Zen Service

The Zen Service is a simple webserver to let you collaborate on stacks via the network. It can be spun up in a background daemon from the command line using zenml service up and managed from the same command line group.

Using the Zen Service's stacks in your project just requires setting up a profile with rest store-type pointed to the url of the service.

zen_service

ZenService (LocalDaemonService) pydantic-model

ZenService daemon that can be used to start a local ZenService server.

Attributes:

Name Type Description
config ZenServiceConfig

service configuration

endpoint ZenServiceEndpoint

optional service endpoint

Source code in zenml/zen_service/zen_service.py
class ZenService(LocalDaemonService):
    """ZenService daemon that can be used to start a local ZenService server.

    Attributes:
        config: service configuration
        endpoint: optional service endpoint
    """

    SERVICE_TYPE = ServiceType(
        name="zen_service",
        type="zenml",
        flavor="zenml",
        description="ZenService to manage stacks, users and pipelines",
    )

    config: ZenServiceConfig
    endpoint: ZenServiceEndpoint

    def __init__(
        self,
        config: Union[ZenServiceConfig, Dict[str, Any]],
        **attrs: Any,
    ) -> None:
        # ensure that the endpoint is created before the service is initialized
        if isinstance(config, ZenServiceConfig) and "endpoint" not in attrs:

            endpoint_uri_path = ZEN_SERVICE_URL_PATH
            healthcheck_uri_path = ZEN_SERVICE_HEALTHCHECK_URL_PATH
            use_head_request = True

            endpoint = ZenServiceEndpoint(
                config=ZenServiceEndpointConfig(
                    protocol=ServiceEndpointProtocol.HTTP,
                    port=config.port,
                    zen_service_uri_path=endpoint_uri_path,
                ),
                monitor=HTTPEndpointHealthMonitor(
                    config=HTTPEndpointHealthMonitorConfig(
                        healthcheck_uri_path=healthcheck_uri_path,
                        use_head_request=use_head_request,
                    )
                ),
            )
            attrs["endpoint"] = endpoint
        super().__init__(config=config, **attrs)

    def run(self) -> None:
        if self.config.store_profile_configuration.store_type == StoreType.REST:
            raise ValueError(
                "Service cannot be started with REST store type. Make sure you "
                "specify a profile with a non-networked persistence backend "
                "when trying to start the Zen Service. (use command line flag "
                "`--profile=$PROFILE_NAME` or set the env variable "
                f"{ENV_ZENML_PROFILE_NAME} to specify the use of a profile "
                "other than the currently active one)"
            )
        logger.info(
            "Starting ZenService as blocking "
            "process... press CTRL+C once to stop it."
        )

        self.endpoint.prepare_for_start()

        # this is the only way to pass information into the FastAPI app??
        os.environ[
            "ZENML_PROFILE_CONFIGURATION"
        ] = self.config.store_profile_configuration.json()

        try:
            uvicorn.run(
                ZEN_SERVICE_ENTRYPOINT,
                host=ZEN_SERVICE_IP,
                port=self.endpoint.status.port,
                log_level="info",
            )
        except KeyboardInterrupt:
            logger.info("Zen service stopped. Resuming normal execution.")

    @property
    def zen_service_uri(self) -> Optional[str]:
        """Get the URI where the service is running.

        Returns:
            The URI where the service can be contacted for requests,
             or None, if the service isn't running.
        """
        if not self.is_running:
            return None
        return self.endpoint.endpoint_uri
zen_service_uri: Optional[str] property readonly

Get the URI where the service is running.

Returns:

Type Description
Optional[str]

The URI where the service can be contacted for requests, or None, if the service isn't running.

run(self)

Run the service daemon process associated with this service.

Subclasses must implement this method to provide the service daemon functionality. This method will be executed in the context of the running daemon, not in the context of the process that calls the start method.

Source code in zenml/zen_service/zen_service.py
def run(self) -> None:
    if self.config.store_profile_configuration.store_type == StoreType.REST:
        raise ValueError(
            "Service cannot be started with REST store type. Make sure you "
            "specify a profile with a non-networked persistence backend "
            "when trying to start the Zen Service. (use command line flag "
            "`--profile=$PROFILE_NAME` or set the env variable "
            f"{ENV_ZENML_PROFILE_NAME} to specify the use of a profile "
            "other than the currently active one)"
        )
    logger.info(
        "Starting ZenService as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()

    # this is the only way to pass information into the FastAPI app??
    os.environ[
        "ZENML_PROFILE_CONFIGURATION"
    ] = self.config.store_profile_configuration.json()

    try:
        uvicorn.run(
            ZEN_SERVICE_ENTRYPOINT,
            host=ZEN_SERVICE_IP,
            port=self.endpoint.status.port,
            log_level="info",
        )
    except KeyboardInterrupt:
        logger.info("Zen service stopped. Resuming normal execution.")

ZenServiceConfig (LocalDaemonServiceConfig) pydantic-model

Zen Service deployment configuration.

Attributes:

Name Type Description
port int

Port at which the service is running

store_profile_configuration ProfileConfiguration

ProfileConfiguration describing where the service should persist its data.

Source code in zenml/zen_service/zen_service.py
class ZenServiceConfig(LocalDaemonServiceConfig):
    """Zen Service deployment configuration.

    Attributes:
        port: Port at which the service is running
        store_profile_configuration: ProfileConfiguration describing where
            the service should persist its data.
    """

    port: int = 8000
    store_profile_configuration: ProfileConfiguration = Field(
        default_factory=lambda: Repository().active_profile
    )

ZenServiceEndpoint (LocalDaemonServiceEndpoint) pydantic-model

A service endpoint exposed by the Zen service daemon.

Attributes:

Name Type Description
config ZenServiceEndpointConfig

service endpoint configuration

monitor HTTPEndpointHealthMonitor

optional service endpoint health monitor

Source code in zenml/zen_service/zen_service.py
class ZenServiceEndpoint(LocalDaemonServiceEndpoint):
    """A service endpoint exposed by the Zen service daemon.

    Attributes:
        config: service endpoint configuration
        monitor: optional service endpoint health monitor
    """

    config: ZenServiceEndpointConfig
    monitor: HTTPEndpointHealthMonitor

    @property
    def endpoint_uri(self) -> Optional[str]:
        uri = self.status.uri
        if not uri:
            return None
        return f"{uri}{self.config.zen_service_uri_path}"

ZenServiceEndpointConfig (LocalDaemonServiceEndpointConfig) pydantic-model

Zen Service endpoint configuration.

Attributes:

Name Type Description
zen_service_uri_path str

URI path for the zenml service

Source code in zenml/zen_service/zen_service.py
class ZenServiceEndpointConfig(LocalDaemonServiceEndpointConfig):
    """Zen Service endpoint configuration.

    Attributes:
        zen_service_uri_path: URI path for the zenml service
    """

    zen_service_uri_path: str

zen_service_api

add_user_to_team(name, user) async

Adds a user to a team.

Source code in zenml/zen_service/zen_service_api.py
@authed.post(TEAMS + "/{name}/users", responses={404: error_response})
async def add_user_to_team(name: str, user: User) -> None:
    """Adds a user to a team."""
    try:
        zen_store.add_user_to_team(team_name=name, user_name=user.name)
    except KeyError as error:
        raise not_found(error) from error

assign_role(data) async

Assigns a role.

Source code in zenml/zen_service/zen_service_api.py
@authed.post(
    ROLE_ASSIGNMENTS,
    responses={404: error_response},
)
async def assign_role(data: Dict[str, Any]) -> None:
    """Assigns a role."""
    role_name = data["role_name"]
    entity_name = data["entity_name"]
    project_name = data.get("project_name")
    is_user = data.get("is_user", True)

    try:
        zen_store.assign_role(
            role_name=role_name,
            entity_name=entity_name,
            project_name=project_name,
            is_user=is_user,
        )
    except KeyError as error:
        raise not_found(error) from error

authorize(credentials=Depends(HTTPBasic))

Authorizes any request to the ZenService.

Right now this method only checks if the username provided as part of http basic auth credentials is registered in the ZenStore.

Parameters:

Name Type Description Default
credentials HTTPBasicCredentials

HTTP basic auth credentials passed to the request.

Depends(HTTPBasic)
Source code in zenml/zen_service/zen_service_api.py
def authorize(credentials: HTTPBasicCredentials = Depends(security)) -> None:
    """Authorizes any request to the ZenService.

    Right now this method only checks if the username provided as part of http
    basic auth credentials is registered in the ZenStore.

    Args:
        credentials: HTTP basic auth credentials passed to the request.
    """
    try:
        zen_store.get_user(credentials.username)
    except KeyError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username.",
        )

conflict(error)

Convert an Exception to a HTTP 409 response.

Source code in zenml/zen_service/zen_service_api.py
def conflict(error: Exception) -> HTTPException:
    """Convert an Exception to a HTTP 409 response."""
    return HTTPException(status_code=409, detail=error_detail(error))

create_project(project) async

Creates a project.

Source code in zenml/zen_service/zen_service_api.py
@authed.post(
    PROJECTS,
    response_model=Project,
    responses={409: error_response},
)
async def create_project(project: Project) -> Project:
    """Creates a project."""
    try:
        return zen_store.create_project(
            project_name=project.name, description=project.description
        )
    except EntityExistsError as error:
        raise conflict(error) from error

create_role(role) async

Creates a role.

Source code in zenml/zen_service/zen_service_api.py
@authed.post(
    ROLES,
    response_model=Role,
    responses={409: error_response},
)
async def create_role(role: Role) -> Role:
    """Creates a role."""
    try:
        return zen_store.create_role(role.name)
    except EntityExistsError as error:
        raise conflict(error) from error

create_team(team) async

Creates a team.

Source code in zenml/zen_service/zen_service_api.py
@authed.post(
    TEAMS,
    response_model=Team,
    responses={409: error_response},
)
async def create_team(team: Team) -> Team:
    """Creates a team."""
    try:
        return zen_store.create_team(team.name)
    except EntityExistsError as error:
        raise conflict(error) from error

create_user(user) async

Creates a user.

Source code in zenml/zen_service/zen_service_api.py
@authed.post(
    USERS,
    response_model=User,
    responses={409: error_response},
)
async def create_user(user: User) -> User:
    """Creates a user."""
    try:
        return zen_store.create_user(user.name)
    except EntityExistsError as error:
        raise conflict(error) from error

delete_project(name) async

Deletes a project.

Source code in zenml/zen_service/zen_service_api.py
@authed.delete(PROJECTS + "/{name}", responses={404: error_response})
async def delete_project(name: str) -> None:
    """Deletes a project."""
    try:
        zen_store.delete_project(project_name=name)
    except KeyError as error:
        raise not_found(error) from error

delete_role(name) async

Deletes a role.

Source code in zenml/zen_service/zen_service_api.py
@authed.delete(ROLES + "/{name}", responses={404: error_response})
async def delete_role(name: str) -> None:
    """Deletes a role."""
    try:
        zen_store.delete_role(role_name=name)
    except KeyError as error:
        raise not_found(error) from error

delete_team(name) async

Deletes a team.

Source code in zenml/zen_service/zen_service_api.py
@authed.delete(TEAMS + "/{name}", responses={404: error_response})
async def delete_team(name: str) -> None:
    """Deletes a team."""
    try:
        zen_store.delete_team(team_name=name)
    except KeyError as error:
        raise not_found(error) from error

delete_user(name) async

Deletes a user.

Source code in zenml/zen_service/zen_service_api.py
@authed.delete(USERS + "/{name}", responses={404: error_response})
async def delete_user(name: str) -> None:
    """Deletes a user."""
    try:
        zen_store.delete_user(user_name=name)
    except KeyError as error:
        raise not_found(error) from error

deregister_stack(name) async

Deregisters a stack.

Source code in zenml/zen_service/zen_service_api.py
@authed.delete(STACKS + "/{name}", responses={404: error_response})
async def deregister_stack(name: str) -> None:
    """Deregisters a stack."""
    try:
        zen_store.deregister_stack(name)
    except KeyError as error:
        raise not_found(error) from error

deregister_stack_component(component_type, name) async

Deregisters a stack component.

Source code in zenml/zen_service/zen_service_api.py
@authed.delete(
    STACK_COMPONENTS + "/{component_type}/{name}",
    responses={404: error_response, 409: error_response},
)
async def deregister_stack_component(
    component_type: StackComponentType, name: str
) -> None:
    """Deregisters a stack component."""
    try:
        return zen_store.deregister_stack_component(component_type, name=name)
    except KeyError as error:
        raise not_found(error) from error
    except ValueError as error:
        raise conflict(error) from error

error_detail(error)

Convert an Exception to API representation.

Source code in zenml/zen_service/zen_service_api.py
def error_detail(error: Exception) -> List[str]:
    """Convert an Exception to API representation."""
    return [type(error).__name__] + [str(a) for a in error.args]

get_project(name) async

Gets a specific project.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(PROJECTS + "/{name}", responses={404: error_response})
async def get_project(name: str) -> Project:
    """Gets a specific project."""
    try:
        return zen_store.get_project(project_name=name)
    except KeyError as error:
        raise not_found(error) from error

get_role(name) async

Gets a specific role.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(ROLES + "/{name}", responses={404: error_response})
async def get_role(name: str) -> Role:
    """Gets a specific role."""
    try:
        return zen_store.get_role(role_name=name)
    except KeyError as error:
        raise not_found(error) from error

get_stack(name) async

Returns the requested stack.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(
    STACKS + "/{name}",
    response_model=StackWrapper,
    responses={404: error_response},
)
async def get_stack(name: str) -> StackWrapper:
    """Returns the requested stack."""
    try:
        return zen_store.get_stack(name)
    except KeyError as error:
        raise not_found(error) from error

get_stack_component(component_type, name) async

Returns the requested stack component.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(
    STACK_COMPONENTS + "/{component_type}/{name}",
    response_model=StackComponentWrapper,
    responses={404: error_response},
)
async def get_stack_component(
    component_type: StackComponentType, name: str
) -> StackComponentWrapper:
    """Returns the requested stack component."""
    try:
        return zen_store.get_stack_component(component_type, name=name)
    except KeyError as error:
        raise not_found(error) from error

get_stack_components(component_type) async

Returns all stack components for the requested type.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(
    STACK_COMPONENTS + "/{component_type}",
    response_model=List[StackComponentWrapper],
)
async def get_stack_components(
    component_type: StackComponentType,
) -> List[StackComponentWrapper]:
    """Returns all stack components for the requested type."""
    return zen_store.get_stack_components(component_type)

get_stack_configuration(name) async

Returns the configuration for the requested stack.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(
    STACK_CONFIGURATIONS + "/{name}",
    response_model=Dict[StackComponentType, str],
    responses={404: error_response},
)
async def get_stack_configuration(name: str) -> Dict[StackComponentType, str]:
    """Returns the configuration for the requested stack."""
    try:
        return zen_store.get_stack_configuration(name)
    except KeyError as error:
        raise not_found(error) from error

get_team(name) async

Gets a specific team.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(TEAMS + "/{name}", responses={404: error_response})
async def get_team(name: str) -> Team:
    """Gets a specific team."""
    try:
        return zen_store.get_team(team_name=name)
    except KeyError as error:
        raise not_found(error) from error

get_user(name) async

Gets a specific user.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(USERS + "/{name}", responses={404: error_response})
async def get_user(name: str) -> User:
    """Gets a specific user."""
    try:
        return zen_store.get_user(user_name=name)
    except KeyError as error:
        raise not_found(error) from error

is_empty() async

Returns whether stacks are registered or not.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(IS_EMPTY, response_model=bool)
async def is_empty() -> bool:
    """Returns whether stacks are registered or not."""
    return zen_store.is_empty

not_found(error)

Convert an Exception to a HTTP 404 response.

Source code in zenml/zen_service/zen_service_api.py
def not_found(error: Exception) -> HTTPException:
    """Convert an Exception to a HTTP 404 response."""
    return HTTPException(status_code=404, detail=error_detail(error))

projects() async

Returns all projects.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(PROJECTS, response_model=List[Project])
async def projects() -> List[Project]:
    """Returns all projects."""
    return zen_store.projects

register_stack(stack) async

Registers a stack.

Source code in zenml/zen_service/zen_service_api.py
@authed.post(
    STACKS,
    response_model=Dict[str, str],
    responses={409: error_response},
)
async def register_stack(stack: StackWrapper) -> Dict[str, str]:
    """Registers a stack."""
    try:
        return zen_store.register_stack(stack)
    except (StackExistsError, StackComponentExistsError) as error:
        raise conflict(error) from error

register_stack_component(component) async

Registers a stack component.

Source code in zenml/zen_service/zen_service_api.py
@authed.post(STACK_COMPONENTS, responses={409: error_response})
async def register_stack_component(
    component: StackComponentWrapper,
) -> None:
    """Registers a stack component."""
    try:
        zen_store.register_stack_component(component)
    except StackComponentExistsError as error:
        raise conflict(error) from error

remove_user_from_team(team_name, user_name) async

Removes a user from a team.

Source code in zenml/zen_service/zen_service_api.py
@authed.delete(
    TEAMS + "/{team_name}/users/{user_name}", responses={404: error_response}
)
async def remove_user_from_team(team_name: str, user_name: str) -> None:
    """Removes a user from a team."""
    try:
        zen_store.remove_user_from_team(
            team_name=team_name, user_name=user_name
        )
    except KeyError as error:
        raise not_found(error) from error

revoke_role(data) async

Revokes a role.

Source code in zenml/zen_service/zen_service_api.py
@authed.delete(ROLE_ASSIGNMENTS, responses={404: error_response})
async def revoke_role(data: Dict[str, Any]) -> None:
    """Revokes a role."""
    role_name = data["role_name"]
    entity_name = data["entity_name"]
    project_name = data.get("project_name")
    is_user = data.get("is_user", True)

    try:
        zen_store.revoke_role(
            role_name=role_name,
            entity_name=entity_name,
            project_name=project_name,
            is_user=is_user,
        )
    except KeyError as error:
        raise not_found(error) from error

role_assignments() async

Returns all role assignments.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(ROLE_ASSIGNMENTS, response_model=List[RoleAssignment])
async def role_assignments() -> List[RoleAssignment]:
    """Returns all role assignments."""
    return zen_store.role_assignments

role_assignments_for_team(name, project_name=None) async

Gets all role assignments for a team.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(
    TEAMS + "/{name}/role_assignments",
    response_model=List[RoleAssignment],
    responses={404: error_response},
)
async def role_assignments_for_team(
    name: str, project_name: Optional[str] = None
) -> List[RoleAssignment]:
    """Gets all role assignments for a team."""
    try:
        return zen_store.get_role_assignments_for_team(
            team_name=name, project_name=project_name
        )
    except KeyError as error:
        raise not_found(error) from error

role_assignments_for_user(name, project_name=None) async

Returns all role assignments for a user.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(
    USERS + "/{name}/role_assignments",
    response_model=List[RoleAssignment],
    responses={404: error_response},
)
async def role_assignments_for_user(
    name: str, project_name: Optional[str] = None
) -> List[RoleAssignment]:
    """Returns all role assignments for a user."""
    try:
        return zen_store.get_role_assignments_for_user(
            user_name=name, project_name=project_name, include_team_roles=False
        )
    except KeyError as error:
        raise not_found(error) from error

roles() async

Returns all roles.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(ROLES, response_model=List[Role])
async def roles() -> List[Role]:
    """Returns all roles."""
    return zen_store.roles

service_info() async

Returns the profile configuration for this service.

Source code in zenml/zen_service/zen_service_api.py
@authed.get("/", response_model=ProfileConfiguration)
async def service_info() -> ProfileConfiguration:
    """Returns the profile configuration for this service."""
    return profile

stack_configurations() async

Returns configurations for all stacks.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(
    STACK_CONFIGURATIONS,
    response_model=Dict[str, Dict[StackComponentType, str]],
)
async def stack_configurations() -> Dict[str, Dict[StackComponentType, str]]:
    """Returns configurations for all stacks."""
    return zen_store.stack_configurations

stacks() async

Returns all stacks.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(STACKS, response_model=List[StackWrapper])
async def stacks() -> List[StackWrapper]:
    """Returns all stacks."""
    return zen_store.stacks

teams() async

Returns all teams.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(TEAMS, response_model=List[Team])
async def teams() -> List[Team]:
    """Returns all teams."""
    return zen_store.teams

teams_for_user(name) async

Returns all teams for a user.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(
    USERS + "/{name}/teams",
    response_model=List[Team],
    responses={404: error_response},
)
async def teams_for_user(name: str) -> List[Team]:
    """Returns all teams for a user."""
    try:
        return zen_store.get_teams_for_user(user_name=name)
    except KeyError as error:
        raise not_found(error) from error

update_stack(stack, name) async

Updates a stack.

Source code in zenml/zen_service/zen_service_api.py
@authed.put(
    STACKS + "/{name}",
    response_model=Dict[str, str],
    responses={404: error_response},
)
async def update_stack(stack: StackWrapper, name: str) -> Dict[str, str]:
    """Updates a stack."""
    try:
        return zen_store.update_stack(name, stack)
    except DoesNotExistException as error:
        raise not_found(error) from error

update_stack_component(name, component_type, component) async

Updates a stack component.

Source code in zenml/zen_service/zen_service_api.py
@authed.put(
    STACK_COMPONENTS + "/{component_type}/{name}",
    response_model=Dict[str, str],
    responses={404: error_response},
)
async def update_stack_component(
    name: str,
    component_type: StackComponentType,
    component: StackComponentWrapper,
) -> Dict[str, str]:
    """Updates a stack component."""
    try:
        return zen_store.update_stack_component(name, component_type, component)
    except KeyError as error:
        raise not_found(error) from error

users() async

Returns all users.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(USERS, response_model=List[User])
async def users() -> List[User]:
    """Returns all users."""
    return zen_store.users

users_for_team(name) async

Returns all users for a team.

Source code in zenml/zen_service/zen_service_api.py
@authed.get(
    TEAMS + "/{name}/users",
    response_model=List[User],
    responses={404: error_response},
)
async def users_for_team(name: str) -> List[User]:
    """Returns all users for a team."""
    try:
        return zen_store.get_users_for_team(team_name=name)
    except KeyError as error:
        raise not_found(error) from error