Skip to content

Zen Server

zenml.zen_server special

ZenML Server Implementation.

The ZenML Server is a centralized service meant for use in a collaborative setting in which stacks, stack components, flavors, pipeline and pipeline runs can be shared over the network with other users.

You can use the zenml server up command to spin up ZenML server instances that are either running locally as daemon processes or docker containers, or to deploy a ZenML server remotely on a managed cloud platform. The other CLI commands in the same zenml server group can be used to manage the server instances deployed from your local machine.

To connect the local ZenML client to one of the managed ZenML servers, call zenml server connect with the name of the server you want to connect to.

auth

Authentication module for ZenML server.

AuthContext (BaseModel) pydantic-model

The authentication context.

Source code in zenml/zen_server/auth.py
class AuthContext(BaseModel):
    """The authentication context."""

    user: UserResponseModel

AuthScheme (StrEnum)

The authentication scheme.

Source code in zenml/zen_server/auth.py
class AuthScheme(StrEnum):
    """The authentication scheme."""

    NO_AUTH = "NO_AUTH"
    HTTP_BASIC = "HTTP_BASIC"
    OAUTH2_PASSWORD_BEARER = "OAUTH2_PASSWORD_BEARER"

authenticate_credentials(user_name_or_id=None, password=None, access_token=None, activation_token=None)

Verify if user authentication credentials are valid.

This function can be used to validate all supplied user credentials to cover a range of possibilities:

  • username+password
  • access token (with embedded user id)
  • username+activation token

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

The username or user ID.

None
password Optional[str]

The password.

None
access_token Optional[str]

The access token.

None
activation_token Optional[str]

The activation token.

None

Returns:

Type Description
Optional[zenml.zen_server.auth.AuthContext]

The authenticated account details, if the account is valid, otherwise None.

Source code in zenml/zen_server/auth.py
def authenticate_credentials(
    user_name_or_id: Optional[Union[str, UUID]] = None,
    password: Optional[str] = None,
    access_token: Optional[str] = None,
    activation_token: Optional[str] = None,
) -> Optional[AuthContext]:
    """Verify if user authentication credentials are valid.

    This function can be used to validate all supplied user credentials to
    cover a range of possibilities:

     * username+password
     * access token (with embedded user id)
     * username+activation token

    Args:
        user_name_or_id: The username or user ID.
        password: The password.
        access_token: The access token.
        activation_token: The activation token.

    Returns:
        The authenticated account details, if the account is valid, otherwise
        None.
    """
    user: Optional[UserAuthModel] = None
    auth_context: Optional[AuthContext] = None
    if user_name_or_id:
        try:
            user = zen_store().get_auth_user(user_name_or_id)
            user_model = zen_store().get_user(user_name_or_id=user_name_or_id)
            auth_context = AuthContext(user=user_model)
        except KeyError:
            # even when the user does not exist, we still want to execute the
            # password/token verification to protect against response discrepancy
            # attacks (https://cwe.mitre.org/data/definitions/204.html)
            pass
    if password is not None:
        if not UserAuthModel.verify_password(password, user):
            return None
    elif access_token is not None:
        user = UserAuthModel.verify_access_token(access_token)
        if not user:
            return None
        user_model = zen_store().get_user(user_name_or_id=user.id)
        auth_context = AuthContext(user=user_model)
    elif activation_token is not None:
        if not UserAuthModel.verify_activation_token(activation_token, user):
            return None

    return auth_context

authentication_provider()

Returns the authentication provider.

Returns:

Type Description
Callable[..., zenml.zen_server.auth.AuthContext]

The authentication provider.

Exceptions:

Type Description
ValueError

If the authentication scheme is not supported.

Source code in zenml/zen_server/auth.py
def authentication_provider() -> Callable[..., AuthContext]:
    """Returns the authentication provider.

    Returns:
        The authentication provider.

    Raises:
        ValueError: If the authentication scheme is not supported.
    """
    auth_scheme = authentication_scheme()
    if auth_scheme == AuthScheme.NO_AUTH:
        return no_authentication
    elif auth_scheme == AuthScheme.HTTP_BASIC:
        return http_authentication
    elif auth_scheme == AuthScheme.OAUTH2_PASSWORD_BEARER:
        return oauth2_password_bearer_authentication
    else:
        raise ValueError(f"Unknown authentication scheme: {auth_scheme}")

authentication_scheme()

Returns the authentication type.

Returns:

Type Description
AuthScheme

The authentication type.

Source code in zenml/zen_server/auth.py
def authentication_scheme() -> AuthScheme:
    """Returns the authentication type.

    Returns:
        The authentication type.
    """
    auth_scheme = AuthScheme(
        os.environ.get(ENV_ZENML_AUTH_TYPE, AuthScheme.OAUTH2_PASSWORD_BEARER)
    )
    return auth_scheme

authorize(security_scopes, token=Depends(OAuth2PasswordBearer))

Authenticates any request to the ZenML server with OAuth2 password bearer JWT tokens.

Parameters:

Name Type Description Default
security_scopes SecurityScopes

Security scope for this token

required
token str

The JWT bearer token to be authenticated.

Depends(OAuth2PasswordBearer)

Returns:

Type Description
AuthContext

The authentication context reflecting the authenticated user.

Exceptions:

Type Description
HTTPException

If the JWT token could not be authorized.

Source code in zenml/zen_server/auth.py
def oauth2_password_bearer_authentication(
    security_scopes: SecurityScopes,
    token: str = Depends(
        OAuth2PasswordBearer(
            tokenUrl=ROOT_URL_PATH + API + VERSION_1 + LOGIN,
            scopes={
                "read": "Read permissions on all entities",
                "write": "Write permissions on all entities",
                "me": "Editing permissions to own user",
            },
        )
    ),
) -> AuthContext:
    """Authenticates any request to the ZenML server with OAuth2 password bearer JWT tokens.

    Args:
        security_scopes: Security scope for this token
        token: The JWT bearer token to be authenticated.

    Returns:
        The authentication context reflecting the authenticated user.

    Raises:
        HTTPException: If the JWT token could not be authorized.
    """
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"
    auth_context = authenticate_credentials(access_token=token)

    try:
        access_token = JWTToken.decode(
            token_type=JWTTokenType.ACCESS_TOKEN, token=token
        )
    except AuthorizationException:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    for scope in security_scopes.scopes:
        if scope not in access_token.permissions:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    if auth_context is None:
        # We have to return an additional WWW-Authenticate header here with the
        # value Bearer to be compliant with the OAuth2 spec.
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return auth_context

http_authentication(security_scopes, credentials=Depends(HTTPBasic))

Authenticates any request to the ZenML Server with basic HTTP authentication.

Parameters:

Name Type Description Default
security_scopes SecurityScopes

Security scope will be ignored for http_auth

required
credentials HTTPBasicCredentials

HTTP basic auth credentials passed to the request.

Depends(HTTPBasic)

Returns:

Type Description
AuthContext

The authentication context reflecting the authenticated user.

Exceptions:

Type Description
HTTPException

If the user credentials could not be authenticated.

Source code in zenml/zen_server/auth.py
def http_authentication(
    security_scopes: SecurityScopes,
    credentials: HTTPBasicCredentials = Depends(HTTPBasic()),
) -> AuthContext:
    """Authenticates any request to the ZenML Server with basic HTTP authentication.

    Args:
        security_scopes: Security scope will be ignored for http_auth
        credentials: HTTP basic auth credentials passed to the request.

    Returns:
        The authentication context reflecting the authenticated user.

    Raises:
        HTTPException: If the user credentials could not be authenticated.
    """
    auth_context = authenticate_credentials(
        user_name_or_id=credentials.username, password=credentials.password
    )
    if auth_context is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
        )

    return auth_context

no_authentication(security_scopes)

Doesn't authenticate requests to the ZenML server.

Parameters:

Name Type Description Default
security_scopes SecurityScopes

Security scope will be ignored for http_auth

required

Returns:

Type Description
AuthContext

The authentication context reflecting the default user.

Exceptions:

Type Description
HTTPException

If the default user is not available.

Source code in zenml/zen_server/auth.py
def no_authentication(security_scopes: SecurityScopes) -> AuthContext:
    """Doesn't authenticate requests to the ZenML server.

    Args:
        security_scopes: Security scope will be ignored for http_auth

    Returns:
        The authentication context reflecting the default user.

    Raises:
        HTTPException: If the default user is not available.
    """
    auth_context = authenticate_credentials(user_name_or_id=DEFAULT_USERNAME)

    if auth_context is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
        )

    return auth_context

oauth2_password_bearer_authentication(security_scopes, token=Depends(OAuth2PasswordBearer))

Authenticates any request to the ZenML server with OAuth2 password bearer JWT tokens.

Parameters:

Name Type Description Default
security_scopes SecurityScopes

Security scope for this token

required
token str

The JWT bearer token to be authenticated.

Depends(OAuth2PasswordBearer)

Returns:

Type Description
AuthContext

The authentication context reflecting the authenticated user.

Exceptions:

Type Description
HTTPException

If the JWT token could not be authorized.

Source code in zenml/zen_server/auth.py
def oauth2_password_bearer_authentication(
    security_scopes: SecurityScopes,
    token: str = Depends(
        OAuth2PasswordBearer(
            tokenUrl=ROOT_URL_PATH + API + VERSION_1 + LOGIN,
            scopes={
                "read": "Read permissions on all entities",
                "write": "Write permissions on all entities",
                "me": "Editing permissions to own user",
            },
        )
    ),
) -> AuthContext:
    """Authenticates any request to the ZenML server with OAuth2 password bearer JWT tokens.

    Args:
        security_scopes: Security scope for this token
        token: The JWT bearer token to be authenticated.

    Returns:
        The authentication context reflecting the authenticated user.

    Raises:
        HTTPException: If the JWT token could not be authorized.
    """
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"
    auth_context = authenticate_credentials(access_token=token)

    try:
        access_token = JWTToken.decode(
            token_type=JWTTokenType.ACCESS_TOKEN, token=token
        )
    except AuthorizationException:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    for scope in security_scopes.scopes:
        if scope not in access_token.permissions:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    if auth_context is None:
        # We have to return an additional WWW-Authenticate header here with the
        # value Bearer to be compliant with the OAuth2 spec.
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return auth_context

deploy special

ZenML server deployments.

base_provider

Base ZenML server provider class.

BaseServerProvider (ABC)

Base ZenML server provider class.

All ZenML server providers must extend and implement this base class.

Source code in zenml/zen_server/deploy/base_provider.py
class BaseServerProvider(ABC):
    """Base ZenML server provider class.

    All ZenML server providers must extend and implement this base class.
    """

    TYPE: ClassVar[ServerProviderType]
    CONFIG_TYPE: ClassVar[Type[ServerDeploymentConfig]] = ServerDeploymentConfig

    @classmethod
    def register_as_provider(cls) -> None:
        """Register the class as a server provider."""
        from zenml.zen_server.deploy.deployer import ServerDeployer

        ServerDeployer.register_provider(cls)

    @classmethod
    def _convert_config(
        cls, config: ServerDeploymentConfig
    ) -> ServerDeploymentConfig:
        """Convert a generic server deployment config into a provider specific config.

        Args:
            config: The generic server deployment config.

        Returns:
            The provider specific server deployment config.

        Raises:
            ServerDeploymentConfigurationError: If the configuration is not
                valid.
        """
        if isinstance(config, cls.CONFIG_TYPE):
            return config
        try:
            return cls.CONFIG_TYPE(**config.dict())
        except ValidationError as e:
            raise ServerDeploymentConfigurationError(
                f"Invalid configuration for provider {cls.TYPE.value}: {e}"
            )

    def deploy_server(
        self,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> ServerDeployment:
        """Deploy a new ZenML server.

        Args:
            config: The generic server deployment configuration.
            timeout: The timeout in seconds to wait until the deployment is
                successful. If not supplied, the default timeout value specified
                by the provider is used.

        Returns:
            The newly created server deployment.

        Raises:
            ServerDeploymentExistsError: If a deployment with the same name
                already exists.
        """
        try:
            self._get_service(config.name)
        except KeyError:
            pass
        else:
            raise ServerDeploymentExistsError(
                f"ZenML server deployment with name '{config.name}' already "
                f"exists"
            )

        # convert the generic deployment config to a provider specific
        # deployment config
        config = self._convert_config(config)
        service = self._create_service(config, timeout)
        return self._get_deployment(service)

    def update_server(
        self,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> ServerDeployment:
        """Update an existing ZenML server deployment.

        Args:
            config: The new generic server deployment configuration.
            timeout: The timeout in seconds to wait until the update is
                successful. If not supplied, the default timeout value specified
                by the provider is used.

        Returns:
            The updated server deployment.

        Raises:
            ServerDeploymentNotFoundError: If a deployment with the given name
                doesn't exist.
        """
        try:
            service = self._get_service(config.name)
        except KeyError:
            raise ServerDeploymentNotFoundError(
                f"ZenML server deployment with name '{config.name}' was not "
                f"found"
            )

        # convert the generic deployment config to a provider specific
        # deployment config
        config = self._convert_config(config)
        old_config = self._get_deployment_config(service)

        if old_config == config:
            logger.info(
                f"The {config.name} ZenML server is already configured with "
                f"the same parameters."
            )
            service = self._start_service(service, timeout)
        else:
            logger.info(f"Updating the {config.name} ZenML server.")
            service = self._update_service(service, config, timeout)

        return self._get_deployment(service)

    def remove_server(
        self,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> None:
        """Tears down and removes all resources and files associated with a ZenML server deployment.

        Args:
            config: The generic server deployment configuration.
            timeout: The timeout in seconds to wait until the server is
                removed. If not supplied, the default timeout value specified
                by the provider is used.

        Raises:
            ServerDeploymentNotFoundError: If a deployment with the given name
                doesn't exist.
        """
        try:
            service = self._get_service(config.name)
        except KeyError:
            raise ServerDeploymentNotFoundError(
                f"ZenML server deployment with name '{config.name}' was not "
                f"found"
            )

        logger.info(f"Removing the {config.name} ZenML server.")
        self._delete_service(service, timeout)

    def get_server(
        self,
        config: ServerDeploymentConfig,
    ) -> ServerDeployment:
        """Retrieve information about a ZenML server deployment.

        Args:
            config: The generic server deployment configuration.

        Returns:
            The server deployment.

        Raises:
            ServerDeploymentNotFoundError: If a deployment with the given name
                doesn't exist.
        """
        try:
            service = self._get_service(config.name)
        except KeyError:
            raise ServerDeploymentNotFoundError(
                f"ZenML server deployment with name '{config.name}' was not "
                f"found"
            )

        return self._get_deployment(service)

    def list_servers(self) -> List[ServerDeployment]:
        """List all server deployments managed by this provider.

        Returns:
            The list of server deployments.
        """
        return [
            self._get_deployment(service) for service in self._list_services()
        ]

    def get_server_logs(
        self,
        config: ServerDeploymentConfig,
        follow: bool = False,
        tail: Optional[int] = None,
    ) -> Generator[str, bool, None]:
        """Retrieve the logs of a ZenML server.

        Args:
            config: The generic server deployment configuration.
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that can be accessed to get the service logs.

        Raises:
            ServerDeploymentNotFoundError: If a deployment with the given name
                doesn't exist.
        """
        try:
            service = self._get_service(config.name)
        except KeyError:
            raise ServerDeploymentNotFoundError(
                f"ZenML server deployment with name '{config.name}' was not "
                f"found"
            )

        return service.get_logs(follow=follow, tail=tail)

    def _get_deployment_status(
        self, service: BaseService
    ) -> ServerDeploymentStatus:
        """Get the status of a server deployment from its service.

        Args:
            service: The server deployment service.

        Returns:
            The status of the server deployment.
        """
        gc = GlobalConfiguration()
        url: Optional[str] = None
        if service.is_running:

            # all services must have an endpoint
            assert service.endpoint is not None

            url = service.endpoint.status.uri
        connected = (
            url is not None and gc.store is not None and gc.store.url == url
        )

        return ServerDeploymentStatus(
            url=url,
            status=service.status.state,
            status_message=service.status.last_error,
            connected=connected,
        )

    def _get_deployment(self, service: BaseService) -> ServerDeployment:
        """Get the server deployment associated with a service.

        Args:
            service: The service.

        Returns:
            The server deployment.
        """
        config = self._get_deployment_config(service)

        return ServerDeployment(
            config=config,
            status=self._get_deployment_status(service),
        )

    @classmethod
    @abstractmethod
    def _get_service_configuration(
        cls,
        server_config: ServerDeploymentConfig,
    ) -> Tuple[
        ServiceConfig,
        ServiceEndpointConfig,
        ServiceEndpointHealthMonitorConfig,
    ]:
        """Construct the service configuration from a server deployment configuration.

        Args:
            server_config: server deployment configuration.

        Returns:
            The service, service endpoint and endpoint monitor configuration.
        """

    @abstractmethod
    def _create_service(
        self,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Create, start and return a service instance for a ZenML server deployment.

        Args:
            config: The server deployment configuration.
            timeout: The timeout in seconds to wait until the service is
                running. If not supplied, a default timeout value specified
                by the provider implementation should be used.

        Returns:
            The service instance.
        """

    @abstractmethod
    def _update_service(
        self,
        service: BaseService,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Update an existing service instance for a ZenML server deployment.

        Args:
            service: The service instance.
            config: The new server deployment configuration.
            timeout: The timeout in seconds to wait until the updated service is
                running. If not supplied, a default timeout value specified
                by the provider implementation should be used.

        Returns:
            The updated service instance.
        """

    @abstractmethod
    def _start_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Start a service instance for a ZenML server deployment.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                running. If not supplied, a default timeout value specified
                by the provider implementation should be used.

        Returns:
            The updated service instance.
        """

    @abstractmethod
    def _stop_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Stop a service instance for a ZenML server deployment.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                stopped. If not supplied, a default timeout value specified
                by the provider implementation should be used.

        Returns:
            The updated service instance.
        """

    @abstractmethod
    def _delete_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> None:
        """Remove a service instance for a ZenML server deployment.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                removed. If not supplied, a default timeout value specified
                by the provider implementation should be used.
        """

    @abstractmethod
    def _get_service(self, server_name: str) -> BaseService:
        """Get the service instance associated with a ZenML server deployment.

        Args:
            server_name: The server deployment name.

        Returns:
            The service instance.

        Raises:
            KeyError: If the server deployment is not found.
        """

    @abstractmethod
    def _list_services(self) -> List[BaseService]:
        """Get all service instances for all deployed ZenML servers.

        Returns:
            A list of service instances.
        """

    @abstractmethod
    def _get_deployment_config(
        self, service: BaseService
    ) -> ServerDeploymentConfig:
        """Recreate the server deployment config from a service instance.

        Args:
            service: The service instance.

        Returns:
            The server deployment config.
        """
CONFIG_TYPE (BaseModel) pydantic-model

Generic server deployment configuration.

All server deployment configurations should inherit from this class and handle extra attributes as provider specific attributes.

Attributes:

Name Type Description
name str

Name of the server deployment.

provider ServerProviderType

The server provider type.

Source code in zenml/zen_server/deploy/base_provider.py
class ServerDeploymentConfig(BaseModel):
    """Generic server deployment configuration.

    All server deployment configurations should inherit from this class and
    handle extra attributes as provider specific attributes.

    Attributes:
        name: Name of the server deployment.
        provider: The server provider type.
    """

    name: str
    provider: ServerProviderType

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them. We need to set this in order
        # to have a mix of mutable and immutable attributes
        validate_assignment = True
        # Allow extra attributes to be set in the base class. The concrete
        # classes are responsible for validating the attributes.
        extra = "allow"
Config

Pydantic configuration class.

Source code in zenml/zen_server/deploy/base_provider.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them. We need to set this in order
    # to have a mix of mutable and immutable attributes
    validate_assignment = True
    # Allow extra attributes to be set in the base class. The concrete
    # classes are responsible for validating the attributes.
    extra = "allow"
deploy_server(self, config, timeout=None)

Deploy a new ZenML server.

Parameters:

Name Type Description Default
config ServerDeploymentConfig

The generic server deployment configuration.

required
timeout Optional[int]

The timeout in seconds to wait until the deployment is successful. If not supplied, the default timeout value specified by the provider is used.

None

Returns:

Type Description
ServerDeployment

The newly created server deployment.

Exceptions:

Type Description
ServerDeploymentExistsError

If a deployment with the same name already exists.

Source code in zenml/zen_server/deploy/base_provider.py
def deploy_server(
    self,
    config: ServerDeploymentConfig,
    timeout: Optional[int] = None,
) -> ServerDeployment:
    """Deploy a new ZenML server.

    Args:
        config: The generic server deployment configuration.
        timeout: The timeout in seconds to wait until the deployment is
            successful. If not supplied, the default timeout value specified
            by the provider is used.

    Returns:
        The newly created server deployment.

    Raises:
        ServerDeploymentExistsError: If a deployment with the same name
            already exists.
    """
    try:
        self._get_service(config.name)
    except KeyError:
        pass
    else:
        raise ServerDeploymentExistsError(
            f"ZenML server deployment with name '{config.name}' already "
            f"exists"
        )

    # convert the generic deployment config to a provider specific
    # deployment config
    config = self._convert_config(config)
    service = self._create_service(config, timeout)
    return self._get_deployment(service)
get_server(self, config)

Retrieve information about a ZenML server deployment.

Parameters:

Name Type Description Default
config ServerDeploymentConfig

The generic server deployment configuration.

required

Returns:

Type Description
ServerDeployment

The server deployment.

Exceptions:

Type Description
ServerDeploymentNotFoundError

If a deployment with the given name doesn't exist.

Source code in zenml/zen_server/deploy/base_provider.py
def get_server(
    self,
    config: ServerDeploymentConfig,
) -> ServerDeployment:
    """Retrieve information about a ZenML server deployment.

    Args:
        config: The generic server deployment configuration.

    Returns:
        The server deployment.

    Raises:
        ServerDeploymentNotFoundError: If a deployment with the given name
            doesn't exist.
    """
    try:
        service = self._get_service(config.name)
    except KeyError:
        raise ServerDeploymentNotFoundError(
            f"ZenML server deployment with name '{config.name}' was not "
            f"found"
        )

    return self._get_deployment(service)
get_server_logs(self, config, follow=False, tail=None)

Retrieve the logs of a ZenML server.

Parameters:

Name Type Description Default
config ServerDeploymentConfig

The generic server deployment configuration.

required
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
Generator[str, bool, NoneType]

A generator that can be accessed to get the service logs.

Exceptions:

Type Description
ServerDeploymentNotFoundError

If a deployment with the given name doesn't exist.

Source code in zenml/zen_server/deploy/base_provider.py
def get_server_logs(
    self,
    config: ServerDeploymentConfig,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[str, bool, None]:
    """Retrieve the logs of a ZenML server.

    Args:
        config: The generic server deployment configuration.
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.

    Raises:
        ServerDeploymentNotFoundError: If a deployment with the given name
            doesn't exist.
    """
    try:
        service = self._get_service(config.name)
    except KeyError:
        raise ServerDeploymentNotFoundError(
            f"ZenML server deployment with name '{config.name}' was not "
            f"found"
        )

    return service.get_logs(follow=follow, tail=tail)
list_servers(self)

List all server deployments managed by this provider.

Returns:

Type Description
List[zenml.zen_server.deploy.deployment.ServerDeployment]

The list of server deployments.

Source code in zenml/zen_server/deploy/base_provider.py
def list_servers(self) -> List[ServerDeployment]:
    """List all server deployments managed by this provider.

    Returns:
        The list of server deployments.
    """
    return [
        self._get_deployment(service) for service in self._list_services()
    ]
register_as_provider() classmethod

Register the class as a server provider.

Source code in zenml/zen_server/deploy/base_provider.py
@classmethod
def register_as_provider(cls) -> None:
    """Register the class as a server provider."""
    from zenml.zen_server.deploy.deployer import ServerDeployer

    ServerDeployer.register_provider(cls)
remove_server(self, config, timeout=None)

Tears down and removes all resources and files associated with a ZenML server deployment.

Parameters:

Name Type Description Default
config ServerDeploymentConfig

The generic server deployment configuration.

required
timeout Optional[int]

The timeout in seconds to wait until the server is removed. If not supplied, the default timeout value specified by the provider is used.

None

Exceptions:

Type Description
ServerDeploymentNotFoundError

If a deployment with the given name doesn't exist.

Source code in zenml/zen_server/deploy/base_provider.py
def remove_server(
    self,
    config: ServerDeploymentConfig,
    timeout: Optional[int] = None,
) -> None:
    """Tears down and removes all resources and files associated with a ZenML server deployment.

    Args:
        config: The generic server deployment configuration.
        timeout: The timeout in seconds to wait until the server is
            removed. If not supplied, the default timeout value specified
            by the provider is used.

    Raises:
        ServerDeploymentNotFoundError: If a deployment with the given name
            doesn't exist.
    """
    try:
        service = self._get_service(config.name)
    except KeyError:
        raise ServerDeploymentNotFoundError(
            f"ZenML server deployment with name '{config.name}' was not "
            f"found"
        )

    logger.info(f"Removing the {config.name} ZenML server.")
    self._delete_service(service, timeout)
update_server(self, config, timeout=None)

Update an existing ZenML server deployment.

Parameters:

Name Type Description Default
config ServerDeploymentConfig

The new generic server deployment configuration.

required
timeout Optional[int]

The timeout in seconds to wait until the update is successful. If not supplied, the default timeout value specified by the provider is used.

None

Returns:

Type Description
ServerDeployment

The updated server deployment.

Exceptions:

Type Description
ServerDeploymentNotFoundError

If a deployment with the given name doesn't exist.

Source code in zenml/zen_server/deploy/base_provider.py
def update_server(
    self,
    config: ServerDeploymentConfig,
    timeout: Optional[int] = None,
) -> ServerDeployment:
    """Update an existing ZenML server deployment.

    Args:
        config: The new generic server deployment configuration.
        timeout: The timeout in seconds to wait until the update is
            successful. If not supplied, the default timeout value specified
            by the provider is used.

    Returns:
        The updated server deployment.

    Raises:
        ServerDeploymentNotFoundError: If a deployment with the given name
            doesn't exist.
    """
    try:
        service = self._get_service(config.name)
    except KeyError:
        raise ServerDeploymentNotFoundError(
            f"ZenML server deployment with name '{config.name}' was not "
            f"found"
        )

    # convert the generic deployment config to a provider specific
    # deployment config
    config = self._convert_config(config)
    old_config = self._get_deployment_config(service)

    if old_config == config:
        logger.info(
            f"The {config.name} ZenML server is already configured with "
            f"the same parameters."
        )
        service = self._start_service(service, timeout)
    else:
        logger.info(f"Updating the {config.name} ZenML server.")
        service = self._update_service(service, config, timeout)

    return self._get_deployment(service)

deployer

ZenML server deployer singleton implementation.

ServerDeployer

Server deployer singleton.

This class is responsible for managing the various server provider implementations and for directing server deployment lifecycle requests to the responsible provider. It acts as a facade built on top of the various server providers.

Source code in zenml/zen_server/deploy/deployer.py
class ServerDeployer(metaclass=SingletonMetaClass):
    """Server deployer singleton.

    This class is responsible for managing the various server provider
    implementations and for directing server deployment lifecycle requests to
    the responsible provider. It acts as a facade built on top of the various
    server providers.
    """

    _providers: ClassVar[Dict[ServerProviderType, BaseServerProvider]] = {}

    @classmethod
    def register_provider(cls, provider: Type[BaseServerProvider]) -> None:
        """Register a server provider.

        Args:
            provider: The server provider to register.

        Raises:
            TypeError: If a provider with the same type is already registered.
        """
        if provider.TYPE in cls._providers:
            raise TypeError(
                f"Server provider '{provider.TYPE}' is already registered."
            )
        logger.debug(f"Registering server provider '{provider.TYPE}'.")
        cls._providers[provider.TYPE] = provider()

    @classmethod
    def get_provider(
        cls, provider_type: ServerProviderType
    ) -> BaseServerProvider:
        """Get the server provider associated with a provider type.

        Args:
            provider_type: The server provider type.

        Returns:
            The server provider associated with the provider type.

        Raises:
            ServerProviderNotFoundError: If no provider is registered for the
                given provider type.
        """
        if provider_type not in cls._providers:
            raise ServerProviderNotFoundError(
                f"Server provider '{provider_type}' is not registered."
            )
        return cls._providers[provider_type]

    def deploy_server(
        self,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> ServerDeployment:
        """Deploy a new ZenML server or update an existing deployment.

        Args:
            config: The server deployment configuration.
            timeout: The timeout in seconds to wait until the deployment is
                successful. If not supplied, the default timeout value specified
                by the provider is used.

        Returns:
            The server deployment.
        """
        # We do this here to ensure that the zenml store is always initialized
        # before the server is deployed. This is necessary because the server
        # may require access to the local store configuration or database.
        gc = GlobalConfiguration()
        if gc.store is None:
            _ = gc.zen_store

        try:
            self.get_server(config.name)
        except ServerDeploymentNotFoundError:
            pass
        else:
            return self.update_server(config=config, timeout=timeout)

        provider_name = config.provider.value
        provider = self.get_provider(config.provider)

        logger.info(
            f"Deploying a {provider_name} ZenML server with name "
            f"'{config.name}'."
        )
        return provider.deploy_server(config, timeout=timeout)

    def update_server(
        self,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> ServerDeployment:
        """Update an existing ZenML server deployment.

        Args:
            config: The new server deployment configuration.
            timeout: The timeout in seconds to wait until the deployment is
                successful. If not supplied, a default timeout value of 30
                seconds is used.

        Returns:
            The updated server deployment.

        Raises:
            ServerDeploymentExistsError: If an existing deployment with the same
                name but a different provider type is found.
        """
        # this will also raise ServerDeploymentNotFoundError if the server
        # does not exist
        existing_server = self.get_server(config.name)

        provider = self.get_provider(config.provider)
        existing_provider = existing_server.config.provider

        if existing_provider != config.provider:
            raise ServerDeploymentExistsError(
                f"A server deployment with the same name '{config.name}' but "
                f"with a different provider '{existing_provider.value}'."
                f"is already provisioned. Please choose a different name or "
                f"tear down the existing deployment."
            )

        return provider.update_server(config, timeout=timeout)

    def remove_server(
        self,
        server_name: str,
        timeout: Optional[int] = None,
    ) -> None:
        """Tears down and removes all resources and files associated with a ZenML server deployment.

        Args:
            server_name: The server deployment name.
            timeout: The timeout in seconds to wait until the deployment is
                successfully torn down. If not supplied, a provider specific
                default timeout value is used.
        """
        # this will also raise ServerDeploymentNotFoundError if the server
        # does not exist
        server = self.get_server(server_name)

        provider_name = server.config.provider.value
        provider = self.get_provider(server.config.provider)

        if self.is_connected_to_server(server_name):
            self.disconnect_from_server(server_name)

        logger.info(
            f"Tearing down the '{server_name}' {provider_name} ZenML server."
        )
        provider.remove_server(server.config, timeout=timeout)

    def is_connected_to_server(self, server_name: str) -> bool:
        """Check if the ZenML client is currently connected to a ZenML server.

        Args:
            server_name: The server deployment name.

        Returns:
            True if the ZenML client is connected to the ZenML server, False
            otherwise.
        """
        # this will also raise ServerDeploymentNotFoundError if the server
        # does not exist
        server = self.get_server(server_name)

        gc = GlobalConfiguration()
        return (
            server.status is not None
            and server.status.url is not None
            and gc.store is not None
            and gc.store.url == server.status.url
        )

    def connect_to_server(
        self,
        server_name: str,
        username: str,
        password: str,
        verify_ssl: Union[bool, str] = True,
    ) -> None:
        """Connect to a ZenML server instance.

        Args:
            server_name: The server deployment name.
            username: The username to use to connect to the server.
            password: The password to use to connect to the server.
            verify_ssl: Either a boolean, in which case it controls whether we
                verify the server's TLS certificate, or a string, in which case
                it must be a path to a CA bundle to use or the CA bundle value
                itself.

        Raises:
            ServerDeploymentError: If the ZenML server is not running or
                is unreachable.
        """
        # this will also raise ServerDeploymentNotFoundError if the server
        # does not exist
        server = self.get_server(server_name)
        provider_name = server.config.provider.value

        gc = GlobalConfiguration()
        if not server.status or not server.status.url:
            raise ServerDeploymentError(
                f"The {provider_name} {server_name} ZenML "
                f"server is not currently running or is unreachable."
            )

        store_config = RestZenStoreConfiguration(
            url=server.status.url,
            username=username,
            password=password,
            verify_ssl=verify_ssl,
        )

        if gc.store == store_config:
            logger.info(
                f"ZenML is already connected to the '{server_name}' "
                f"{provider_name} ZenML server."
            )
            return

        logger.info(
            f"Connecting ZenML to the '{server_name}' "
            f"{provider_name} ZenML server ({store_config.url})."
        )

        gc.set_store(store_config)

        logger.info(
            f"Connected ZenML to the '{server_name}' "
            f"{provider_name} ZenML server ({store_config.url})."
        )

    def disconnect_from_server(
        self,
        server_name: Optional[str] = None,
    ) -> None:
        """Disconnect from a ZenML server instance.

        Args:
            server_name: The server deployment name. If supplied, the deployer
                will check if the ZenML client is indeed connected to the server
                and disconnect only if that is the case. Otherwise the deployer
                will disconnect from any ZenML server.
        """
        gc = GlobalConfiguration()

        if not gc.store or gc.store.type != StoreType.REST:
            logger.info("ZenML is not currently connected to a ZenML server.")
            return

        if server_name:
            # this will also raise ServerDeploymentNotFoundError if the server
            # does not exist
            server = self.get_server(server_name)
            provider_name = server.config.provider.value

            if not self.is_connected_to_server(server_name):
                logger.info(
                    f"ZenML is not currently connected to the '{server_name}' "
                    f"{provider_name} ZenML server."
                )
                return

            logger.info(
                f"Disconnecting ZenML from the '{server_name}' "
                f"{provider_name} ZenML server ({gc.store.url})."
            )
        else:
            logger.info(
                f"Disconnecting ZenML from the {gc.store.url} ZenML server."
            )

        gc.set_default_store()

        logger.info("Disconnected ZenML from the ZenML server.")

    def get_server(
        self,
        server_name: str,
    ) -> ServerDeployment:
        """Get a server deployment.

        Args:
            server_name: The server deployment name.

        Returns:
            The requested server deployment.

        Raises:
            ServerDeploymentNotFoundError: If no server deployment with the
                given name is found.
        """
        for provider in self._providers.values():
            try:
                return provider.get_server(
                    ServerDeploymentConfig(
                        name=server_name, provider=provider.TYPE
                    )
                )
            except ServerDeploymentNotFoundError:
                pass

        raise ServerDeploymentNotFoundError(
            f"Server deployment '{server_name}' not found."
        )

    def list_servers(
        self,
        server_name: Optional[str] = None,
        provider_type: Optional[ServerProviderType] = None,
    ) -> List[ServerDeployment]:
        """List all server deployments.

        Args:
            server_name: The server deployment name to filter by.
            provider_type: The server provider type to filter by.

        Returns:
            The list of server deployments.
        """
        providers: List[BaseServerProvider] = []
        if provider_type:
            providers = [self.get_provider(provider_type)]
        else:
            providers = list(self._providers.values())

        servers: List[ServerDeployment] = []
        for provider in providers:
            if server_name:
                try:
                    servers.append(
                        provider.get_server(
                            ServerDeploymentConfig(
                                name=server_name,
                                provider=provider.TYPE,
                            )
                        )
                    )
                except ServerDeploymentNotFoundError:
                    pass
            else:
                servers.extend(provider.list_servers())

        return servers

    def get_server_logs(
        self,
        server_name: str,
        follow: bool = False,
        tail: Optional[int] = None,
    ) -> Generator[str, bool, None]:
        """Retrieve the logs of a ZenML server.

        Args:
            server_name: The server deployment name.
            follow: if True, the logs will be streamed as they are written
            tail: only retrieve the last NUM lines of log output.

        Returns:
            A generator that can be accessed to get the service logs.
        """
        # this will also raise ServerDeploymentNotFoundError if the server
        # does not exist
        server = self.get_server(server_name)

        provider_name = server.config.provider.value
        provider = self.get_provider(server.config.provider)

        logger.info(
            f"Fetching logs from the '{server_name}' {provider_name} ZenML "
            f"server..."
        )
        return provider.get_server_logs(server.config, follow=follow, tail=tail)
connect_to_server(self, server_name, username, password, verify_ssl=True)

Connect to a ZenML server instance.

Parameters:

Name Type Description Default
server_name str

The server deployment name.

required
username str

The username to use to connect to the server.

required
password str

The password to use to connect to the server.

required
verify_ssl Union[bool, str]

Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use or the CA bundle value itself.

True

Exceptions:

Type Description
ServerDeploymentError

If the ZenML server is not running or is unreachable.

Source code in zenml/zen_server/deploy/deployer.py
def connect_to_server(
    self,
    server_name: str,
    username: str,
    password: str,
    verify_ssl: Union[bool, str] = True,
) -> None:
    """Connect to a ZenML server instance.

    Args:
        server_name: The server deployment name.
        username: The username to use to connect to the server.
        password: The password to use to connect to the server.
        verify_ssl: Either a boolean, in which case it controls whether we
            verify the server's TLS certificate, or a string, in which case
            it must be a path to a CA bundle to use or the CA bundle value
            itself.

    Raises:
        ServerDeploymentError: If the ZenML server is not running or
            is unreachable.
    """
    # this will also raise ServerDeploymentNotFoundError if the server
    # does not exist
    server = self.get_server(server_name)
    provider_name = server.config.provider.value

    gc = GlobalConfiguration()
    if not server.status or not server.status.url:
        raise ServerDeploymentError(
            f"The {provider_name} {server_name} ZenML "
            f"server is not currently running or is unreachable."
        )

    store_config = RestZenStoreConfiguration(
        url=server.status.url,
        username=username,
        password=password,
        verify_ssl=verify_ssl,
    )

    if gc.store == store_config:
        logger.info(
            f"ZenML is already connected to the '{server_name}' "
            f"{provider_name} ZenML server."
        )
        return

    logger.info(
        f"Connecting ZenML to the '{server_name}' "
        f"{provider_name} ZenML server ({store_config.url})."
    )

    gc.set_store(store_config)

    logger.info(
        f"Connected ZenML to the '{server_name}' "
        f"{provider_name} ZenML server ({store_config.url})."
    )
deploy_server(self, config, timeout=None)

Deploy a new ZenML server or update an existing deployment.

Parameters:

Name Type Description Default
config ServerDeploymentConfig

The server deployment configuration.

required
timeout Optional[int]

The timeout in seconds to wait until the deployment is successful. If not supplied, the default timeout value specified by the provider is used.

None

Returns:

Type Description
ServerDeployment

The server deployment.

Source code in zenml/zen_server/deploy/deployer.py
def deploy_server(
    self,
    config: ServerDeploymentConfig,
    timeout: Optional[int] = None,
) -> ServerDeployment:
    """Deploy a new ZenML server or update an existing deployment.

    Args:
        config: The server deployment configuration.
        timeout: The timeout in seconds to wait until the deployment is
            successful. If not supplied, the default timeout value specified
            by the provider is used.

    Returns:
        The server deployment.
    """
    # We do this here to ensure that the zenml store is always initialized
    # before the server is deployed. This is necessary because the server
    # may require access to the local store configuration or database.
    gc = GlobalConfiguration()
    if gc.store is None:
        _ = gc.zen_store

    try:
        self.get_server(config.name)
    except ServerDeploymentNotFoundError:
        pass
    else:
        return self.update_server(config=config, timeout=timeout)

    provider_name = config.provider.value
    provider = self.get_provider(config.provider)

    logger.info(
        f"Deploying a {provider_name} ZenML server with name "
        f"'{config.name}'."
    )
    return provider.deploy_server(config, timeout=timeout)
disconnect_from_server(self, server_name=None)

Disconnect from a ZenML server instance.

Parameters:

Name Type Description Default
server_name Optional[str]

The server deployment name. If supplied, the deployer will check if the ZenML client is indeed connected to the server and disconnect only if that is the case. Otherwise the deployer will disconnect from any ZenML server.

None
Source code in zenml/zen_server/deploy/deployer.py
def disconnect_from_server(
    self,
    server_name: Optional[str] = None,
) -> None:
    """Disconnect from a ZenML server instance.

    Args:
        server_name: The server deployment name. If supplied, the deployer
            will check if the ZenML client is indeed connected to the server
            and disconnect only if that is the case. Otherwise the deployer
            will disconnect from any ZenML server.
    """
    gc = GlobalConfiguration()

    if not gc.store or gc.store.type != StoreType.REST:
        logger.info("ZenML is not currently connected to a ZenML server.")
        return

    if server_name:
        # this will also raise ServerDeploymentNotFoundError if the server
        # does not exist
        server = self.get_server(server_name)
        provider_name = server.config.provider.value

        if not self.is_connected_to_server(server_name):
            logger.info(
                f"ZenML is not currently connected to the '{server_name}' "
                f"{provider_name} ZenML server."
            )
            return

        logger.info(
            f"Disconnecting ZenML from the '{server_name}' "
            f"{provider_name} ZenML server ({gc.store.url})."
        )
    else:
        logger.info(
            f"Disconnecting ZenML from the {gc.store.url} ZenML server."
        )

    gc.set_default_store()

    logger.info("Disconnected ZenML from the ZenML server.")
get_provider(provider_type) classmethod

Get the server provider associated with a provider type.

Parameters:

Name Type Description Default
provider_type ServerProviderType

The server provider type.

required

Returns:

Type Description
BaseServerProvider

The server provider associated with the provider type.

Exceptions:

Type Description
ServerProviderNotFoundError

If no provider is registered for the given provider type.

Source code in zenml/zen_server/deploy/deployer.py
@classmethod
def get_provider(
    cls, provider_type: ServerProviderType
) -> BaseServerProvider:
    """Get the server provider associated with a provider type.

    Args:
        provider_type: The server provider type.

    Returns:
        The server provider associated with the provider type.

    Raises:
        ServerProviderNotFoundError: If no provider is registered for the
            given provider type.
    """
    if provider_type not in cls._providers:
        raise ServerProviderNotFoundError(
            f"Server provider '{provider_type}' is not registered."
        )
    return cls._providers[provider_type]
get_server(self, server_name)

Get a server deployment.

Parameters:

Name Type Description Default
server_name str

The server deployment name.

required

Returns:

Type Description
ServerDeployment

The requested server deployment.

Exceptions:

Type Description
ServerDeploymentNotFoundError

If no server deployment with the given name is found.

Source code in zenml/zen_server/deploy/deployer.py
def get_server(
    self,
    server_name: str,
) -> ServerDeployment:
    """Get a server deployment.

    Args:
        server_name: The server deployment name.

    Returns:
        The requested server deployment.

    Raises:
        ServerDeploymentNotFoundError: If no server deployment with the
            given name is found.
    """
    for provider in self._providers.values():
        try:
            return provider.get_server(
                ServerDeploymentConfig(
                    name=server_name, provider=provider.TYPE
                )
            )
        except ServerDeploymentNotFoundError:
            pass

    raise ServerDeploymentNotFoundError(
        f"Server deployment '{server_name}' not found."
    )
get_server_logs(self, server_name, follow=False, tail=None)

Retrieve the logs of a ZenML server.

Parameters:

Name Type Description Default
server_name str

The server deployment name.

required
follow bool

if True, the logs will be streamed as they are written

False
tail Optional[int]

only retrieve the last NUM lines of log output.

None

Returns:

Type Description
Generator[str, bool, NoneType]

A generator that can be accessed to get the service logs.

Source code in zenml/zen_server/deploy/deployer.py
def get_server_logs(
    self,
    server_name: str,
    follow: bool = False,
    tail: Optional[int] = None,
) -> Generator[str, bool, None]:
    """Retrieve the logs of a ZenML server.

    Args:
        server_name: The server deployment name.
        follow: if True, the logs will be streamed as they are written
        tail: only retrieve the last NUM lines of log output.

    Returns:
        A generator that can be accessed to get the service logs.
    """
    # this will also raise ServerDeploymentNotFoundError if the server
    # does not exist
    server = self.get_server(server_name)

    provider_name = server.config.provider.value
    provider = self.get_provider(server.config.provider)

    logger.info(
        f"Fetching logs from the '{server_name}' {provider_name} ZenML "
        f"server..."
    )
    return provider.get_server_logs(server.config, follow=follow, tail=tail)
is_connected_to_server(self, server_name)

Check if the ZenML client is currently connected to a ZenML server.

Parameters:

Name Type Description Default
server_name str

The server deployment name.

required

Returns:

Type Description
bool

True if the ZenML client is connected to the ZenML server, False otherwise.

Source code in zenml/zen_server/deploy/deployer.py
def is_connected_to_server(self, server_name: str) -> bool:
    """Check if the ZenML client is currently connected to a ZenML server.

    Args:
        server_name: The server deployment name.

    Returns:
        True if the ZenML client is connected to the ZenML server, False
        otherwise.
    """
    # this will also raise ServerDeploymentNotFoundError if the server
    # does not exist
    server = self.get_server(server_name)

    gc = GlobalConfiguration()
    return (
        server.status is not None
        and server.status.url is not None
        and gc.store is not None
        and gc.store.url == server.status.url
    )
list_servers(self, server_name=None, provider_type=None)

List all server deployments.

Parameters:

Name Type Description Default
server_name Optional[str]

The server deployment name to filter by.

None
provider_type Optional[zenml.enums.ServerProviderType]

The server provider type to filter by.

None

Returns:

Type Description
List[zenml.zen_server.deploy.deployment.ServerDeployment]

The list of server deployments.

Source code in zenml/zen_server/deploy/deployer.py
def list_servers(
    self,
    server_name: Optional[str] = None,
    provider_type: Optional[ServerProviderType] = None,
) -> List[ServerDeployment]:
    """List all server deployments.

    Args:
        server_name: The server deployment name to filter by.
        provider_type: The server provider type to filter by.

    Returns:
        The list of server deployments.
    """
    providers: List[BaseServerProvider] = []
    if provider_type:
        providers = [self.get_provider(provider_type)]
    else:
        providers = list(self._providers.values())

    servers: List[ServerDeployment] = []
    for provider in providers:
        if server_name:
            try:
                servers.append(
                    provider.get_server(
                        ServerDeploymentConfig(
                            name=server_name,
                            provider=provider.TYPE,
                        )
                    )
                )
            except ServerDeploymentNotFoundError:
                pass
        else:
            servers.extend(provider.list_servers())

    return servers
register_provider(provider) classmethod

Register a server provider.

Parameters:

Name Type Description Default
provider Type[zenml.zen_server.deploy.base_provider.BaseServerProvider]

The server provider to register.

required

Exceptions:

Type Description
TypeError

If a provider with the same type is already registered.

Source code in zenml/zen_server/deploy/deployer.py
@classmethod
def register_provider(cls, provider: Type[BaseServerProvider]) -> None:
    """Register a server provider.

    Args:
        provider: The server provider to register.

    Raises:
        TypeError: If a provider with the same type is already registered.
    """
    if provider.TYPE in cls._providers:
        raise TypeError(
            f"Server provider '{provider.TYPE}' is already registered."
        )
    logger.debug(f"Registering server provider '{provider.TYPE}'.")
    cls._providers[provider.TYPE] = provider()
remove_server(self, server_name, timeout=None)

Tears down and removes all resources and files associated with a ZenML server deployment.

Parameters:

Name Type Description Default
server_name str

The server deployment name.

required
timeout Optional[int]

The timeout in seconds to wait until the deployment is successfully torn down. If not supplied, a provider specific default timeout value is used.

None
Source code in zenml/zen_server/deploy/deployer.py
def remove_server(
    self,
    server_name: str,
    timeout: Optional[int] = None,
) -> None:
    """Tears down and removes all resources and files associated with a ZenML server deployment.

    Args:
        server_name: The server deployment name.
        timeout: The timeout in seconds to wait until the deployment is
            successfully torn down. If not supplied, a provider specific
            default timeout value is used.
    """
    # this will also raise ServerDeploymentNotFoundError if the server
    # does not exist
    server = self.get_server(server_name)

    provider_name = server.config.provider.value
    provider = self.get_provider(server.config.provider)

    if self.is_connected_to_server(server_name):
        self.disconnect_from_server(server_name)

    logger.info(
        f"Tearing down the '{server_name}' {provider_name} ZenML server."
    )
    provider.remove_server(server.config, timeout=timeout)
update_server(self, config, timeout=None)

Update an existing ZenML server deployment.

Parameters:

Name Type Description Default
config ServerDeploymentConfig

The new server deployment configuration.

required
timeout Optional[int]

The timeout in seconds to wait until the deployment is successful. If not supplied, a default timeout value of 30 seconds is used.

None

Returns:

Type Description
ServerDeployment

The updated server deployment.

Exceptions:

Type Description
ServerDeploymentExistsError

If an existing deployment with the same name but a different provider type is found.

Source code in zenml/zen_server/deploy/deployer.py
def update_server(
    self,
    config: ServerDeploymentConfig,
    timeout: Optional[int] = None,
) -> ServerDeployment:
    """Update an existing ZenML server deployment.

    Args:
        config: The new server deployment configuration.
        timeout: The timeout in seconds to wait until the deployment is
            successful. If not supplied, a default timeout value of 30
            seconds is used.

    Returns:
        The updated server deployment.

    Raises:
        ServerDeploymentExistsError: If an existing deployment with the same
            name but a different provider type is found.
    """
    # this will also raise ServerDeploymentNotFoundError if the server
    # does not exist
    existing_server = self.get_server(config.name)

    provider = self.get_provider(config.provider)
    existing_provider = existing_server.config.provider

    if existing_provider != config.provider:
        raise ServerDeploymentExistsError(
            f"A server deployment with the same name '{config.name}' but "
            f"with a different provider '{existing_provider.value}'."
            f"is already provisioned. Please choose a different name or "
            f"tear down the existing deployment."
        )

    return provider.update_server(config, timeout=timeout)

deployment

Zen Server deployment definitions.

ServerDeployment (BaseModel) pydantic-model

Server deployment.

Attributes:

Name Type Description
config ServerDeploymentConfig

The server deployment configuration.

status Optional[zenml.zen_server.deploy.deployment.ServerDeploymentStatus]

The server deployment status.

Source code in zenml/zen_server/deploy/deployment.py
class ServerDeployment(BaseModel):
    """Server deployment.

    Attributes:
        config: The server deployment configuration.
        status: The server deployment status.
    """

    config: ServerDeploymentConfig
    status: Optional[ServerDeploymentStatus]
ServerDeploymentConfig (BaseModel) pydantic-model

Generic server deployment configuration.

All server deployment configurations should inherit from this class and handle extra attributes as provider specific attributes.

Attributes:

Name Type Description
name str

Name of the server deployment.

provider ServerProviderType

The server provider type.

Source code in zenml/zen_server/deploy/deployment.py
class ServerDeploymentConfig(BaseModel):
    """Generic server deployment configuration.

    All server deployment configurations should inherit from this class and
    handle extra attributes as provider specific attributes.

    Attributes:
        name: Name of the server deployment.
        provider: The server provider type.
    """

    name: str
    provider: ServerProviderType

    class Config:
        """Pydantic configuration class."""

        # Validate attributes when assigning them. We need to set this in order
        # to have a mix of mutable and immutable attributes
        validate_assignment = True
        # Allow extra attributes to be set in the base class. The concrete
        # classes are responsible for validating the attributes.
        extra = "allow"
Config

Pydantic configuration class.

Source code in zenml/zen_server/deploy/deployment.py
class Config:
    """Pydantic configuration class."""

    # Validate attributes when assigning them. We need to set this in order
    # to have a mix of mutable and immutable attributes
    validate_assignment = True
    # Allow extra attributes to be set in the base class. The concrete
    # classes are responsible for validating the attributes.
    extra = "allow"
ServerDeploymentStatus (BaseModel) pydantic-model

Server deployment status.

Ideally this should convey the following information:

  • whether the server's deployment is managed by this client (i.e. if the server was deployed with zenml up)
  • for a managed deployment, the status of the deployment/tear-down, e.g. not deployed, deploying, running, deleting, deployment timeout/error, tear-down timeout/error etc.
  • for an unmanaged deployment, the operational status (i.e. whether the server is reachable)
  • the URL of the server

Attributes:

Name Type Description
status ServiceState

The status of the server deployment.

status_message Optional[str]

A message describing the last status.

connected bool

Whether the client is currently connected to this server.

url Optional[str]

The URL of the server.

Source code in zenml/zen_server/deploy/deployment.py
class ServerDeploymentStatus(BaseModel):
    """Server deployment status.

    Ideally this should convey the following information:

    * whether the server's deployment is managed by this client (i.e. if
    the server was deployed with `zenml up`)
    * for a managed deployment, the status of the deployment/tear-down, e.g.
    not deployed, deploying, running, deleting, deployment timeout/error,
    tear-down timeout/error etc.
    * for an unmanaged deployment, the operational status (i.e. whether the
    server is reachable)
    * the URL of the server

    Attributes:
        status: The status of the server deployment.
        status_message: A message describing the last status.
        connected: Whether the client is currently connected to this server.
        url: The URL of the server.
    """

    status: ServiceState
    status_message: Optional[str] = None
    connected: bool
    url: Optional[str] = None
    ca_crt: Optional[str] = None

docker special

ZenML Server Docker Deployment.

docker_provider

Zen Server docker deployer implementation.

DockerServerProvider (BaseServerProvider)

Docker ZenML server provider.

Source code in zenml/zen_server/deploy/docker/docker_provider.py
class DockerServerProvider(BaseServerProvider):
    """Docker ZenML server provider."""

    TYPE: ClassVar[ServerProviderType] = ServerProviderType.DOCKER
    CONFIG_TYPE: ClassVar[
        Type[ServerDeploymentConfig]
    ] = DockerServerDeploymentConfig

    @classmethod
    def _get_service_configuration(
        cls,
        server_config: ServerDeploymentConfig,
    ) -> Tuple[
        ServiceConfig,
        ServiceEndpointConfig,
        ServiceEndpointHealthMonitorConfig,
    ]:
        """Construct the service configuration from a server deployment configuration.

        Args:
            server_config: server deployment configuration.

        Returns:
            The service, service endpoint and endpoint monitor configuration.
        """
        assert isinstance(server_config, DockerServerDeploymentConfig)

        return (
            DockerZenServerConfig(
                root_runtime_path=DockerZenServer.config_path(),
                singleton=True,
                image=server_config.image,
                name=server_config.name,
                server=server_config,
            ),
            ContainerServiceEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                port=server_config.port,
                allocate_port=False,
            ),
            HTTPEndpointHealthMonitorConfig(
                healthcheck_uri_path=ZEN_SERVER_HEALTHCHECK_URL_PATH,
                use_head_request=True,
            ),
        )

    def _create_service(
        self,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Create, start and return the docker ZenML server deployment service.

        Args:
            config: The server deployment configuration.
            timeout: The timeout in seconds to wait until the service is
                running.

        Returns:
            The service instance.

        Raises:
            RuntimeError: If a docker service is already running.
        """
        assert isinstance(config, DockerServerDeploymentConfig)

        if timeout is None:
            timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT

        service = DockerZenServer.get_service()
        existing_service = DockerZenServer.get_service()
        if existing_service:
            raise RuntimeError(
                f"A docker ZenML server with name '{existing_service.config.name}' "
                f"is already running. Please stop it first before starting a "
                f"new one."
            )

        (
            service_config,
            endpoint_cfg,
            monitor_cfg,
        ) = self._get_service_configuration(config)
        endpoint = ContainerServiceEndpoint(
            config=endpoint_cfg,
            monitor=HTTPEndpointHealthMonitor(
                config=monitor_cfg,
            ),
        )
        service = DockerZenServer(config=service_config, endpoint=endpoint)

        service.start(timeout=timeout)
        return service

    def _update_service(
        self,
        service: BaseService,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Update the docker ZenML server deployment service.

        Args:
            service: The service instance.
            config: The new server deployment configuration.
            timeout: The timeout in seconds to wait until the updated service is
                running.

        Returns:
            The updated service instance.
        """
        if timeout is None:
            timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT

        (
            new_config,
            new_endpoint_cfg,
            new_monitor_cfg,
        ) = self._get_service_configuration(config)

        assert service.endpoint
        assert service.endpoint.monitor

        service.stop(timeout=timeout)
        (
            service.config,
            service.endpoint.config,
            service.endpoint.monitor.config,
        ) = (
            new_config,
            new_endpoint_cfg,
            new_monitor_cfg,
        )
        service.start(timeout=timeout)

        return service

    def _start_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Start the docker ZenML server deployment service.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                running.

        Returns:
            The updated service instance.
        """
        if timeout is None:
            timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT

        service.start(timeout=timeout)
        return service

    def _stop_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Stop the docker ZenML server deployment service.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                stopped.

        Returns:
            The updated service instance.
        """
        if timeout is None:
            timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT

        service.stop(timeout=timeout)
        return service

    def _delete_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> None:
        """Remove the docker ZenML server deployment service.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                removed.
        """
        assert isinstance(service, DockerZenServer)

        if timeout is None:
            timeout = DOCKER_ZENML_SERVER_DEFAULT_TIMEOUT

        service.stop(timeout)
        shutil.rmtree(DockerZenServer.config_path())

    def _get_service(self, server_name: str) -> BaseService:
        """Get the docker ZenML server deployment service.

        Args:
            server_name: The server deployment name.

        Returns:
            The service instance.

        Raises:
            KeyError: If the server deployment is not found.
        """
        service = DockerZenServer.get_service()
        if service is None:
            raise KeyError("The docker ZenML server is not deployed.")

        if service.config.name != server_name:
            raise KeyError(
                "The docker ZenML server is deployed but with a different name."
            )

        return service

    def _list_services(self) -> List[BaseService]:
        """Get all service instances for all deployed ZenML servers.

        Returns:
            A list of service instances.
        """
        service = DockerZenServer.get_service()
        if service:
            return [service]
        return []

    def _get_deployment_config(
        self, service: BaseService
    ) -> ServerDeploymentConfig:
        """Recreate the server deployment configuration from a service instance.

        Args:
            service: The service instance.

        Returns:
            The server deployment configuration.
        """
        server = cast(DockerZenServer, service)
        return server.config.server
CONFIG_TYPE (ServerDeploymentConfig) pydantic-model

Docker server deployment configuration.

Attributes:

Name Type Description
port int

The TCP port number where the server is accepting connections.

image str

The Docker image to use for the server.

Source code in zenml/zen_server/deploy/docker/docker_provider.py
class DockerServerDeploymentConfig(ServerDeploymentConfig):
    """Docker server deployment configuration.

    Attributes:
        port: The TCP port number where the server is accepting connections.
        image: The Docker image to use for the server.
    """

    port: int = 8238
    image: str = DOCKER_ZENML_SERVER_DEFAULT_IMAGE
    store: Optional[StoreConfiguration] = None

    class Config:
        """Pydantic configuration."""

        extra = "forbid"
Config

Pydantic configuration.

Source code in zenml/zen_server/deploy/docker/docker_provider.py
class Config:
    """Pydantic configuration."""

    extra = "forbid"
docker_zen_server

Service implementation for the ZenML docker server deployment.

DockerServerDeploymentConfig (ServerDeploymentConfig) pydantic-model

Docker server deployment configuration.

Attributes:

Name Type Description
port int

The TCP port number where the server is accepting connections.

image str

The Docker image to use for the server.

Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
class DockerServerDeploymentConfig(ServerDeploymentConfig):
    """Docker server deployment configuration.

    Attributes:
        port: The TCP port number where the server is accepting connections.
        image: The Docker image to use for the server.
    """

    port: int = 8238
    image: str = DOCKER_ZENML_SERVER_DEFAULT_IMAGE
    store: Optional[StoreConfiguration] = None

    class Config:
        """Pydantic configuration."""

        extra = "forbid"
Config

Pydantic configuration.

Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
class Config:
    """Pydantic configuration."""

    extra = "forbid"
DockerZenServer (ContainerService) pydantic-model

Service that can be used to start a docker ZenServer.

Attributes:

Name Type Description
config DockerZenServerConfig

service configuration

endpoint ContainerServiceEndpoint

service endpoint

Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
class DockerZenServer(ContainerService):
    """Service that can be used to start a docker ZenServer.

    Attributes:
        config: service configuration
        endpoint: service endpoint
    """

    SERVICE_TYPE = ServiceType(
        name="docker_zenml_server",
        type="zen_server",
        flavor="docker",
        description="Docker ZenML server deployment",
    )

    config: DockerZenServerConfig
    endpoint: ContainerServiceEndpoint

    @classmethod
    def config_path(cls) -> str:
        """Path to the directory where the docker ZenML server files are located.

        Returns:
            Path to the docker ZenML server runtime directory.
        """
        return os.path.join(
            get_global_config_directory(),
            "zen_server",
            "docker",
        )

    @property
    def _global_config_path(self) -> str:
        """Path to the global configuration directory used by this server.

        Returns:
            Path to the global configuration directory used by this server.
        """
        return os.path.join(
            self.config_path(), SERVICE_CONTAINER_GLOBAL_CONFIG_DIR
        )

    def _copy_global_configuration(self) -> None:
        """Copy the global configuration to the docker ZenML server location.

        The docker ZenML server global configuration is a copy of the docker
        global configuration. If a store configuration is explicitly set in
        the server configuration, it will be used. Otherwise, the store
        configuration is set to point to the local store.
        """
        gc = GlobalConfiguration()

        # this creates a copy of the global configuration and saves it to the
        # server configuration path. The store is set to where the default local
        # store is mounted in the docker container unless a custom store
        # configuration is explicitly supplied with the server configuration.
        gc.copy_configuration(
            config_path=self._global_config_path,
            store_config=self.config.server.store,
            empty_store=self.config.server.store is None,
        )

    @classmethod
    def get_service(cls) -> Optional["DockerZenServer"]:
        """Load and return the docker ZenML server service, if present.

        Returns:
            The docker ZenML server service or None, if the docker server
            deployment is not found.
        """
        from zenml.services import ServiceRegistry

        config_filename = os.path.join(cls.config_path(), "service.json")
        try:
            with open(config_filename, "r") as f:
                return cast(
                    DockerZenServer,
                    ServiceRegistry().load_service_from_json(f.read()),
                )
        except FileNotFoundError:
            return None

    def _get_container_cmd(self) -> Tuple[List[str], Dict[str, str]]:
        """Get the command to run the service container.

        Override the inherited method to use a ZenML global config path inside
        the container that points to the global config copy instead of the
        one mounted from the local host.

        Returns:
            Command needed to launch the docker container and the environment
            variables to set, in the formats accepted by subprocess.Popen.
        """
        GlobalConfiguration()

        cmd, env = super()._get_container_cmd()
        env[ENV_ZENML_CONFIG_PATH] = os.path.join(
            SERVICE_CONTAINER_PATH,
            SERVICE_CONTAINER_GLOBAL_CONFIG_DIR,
        )
        env[ENV_ZENML_SERVER_DEPLOYMENT_TYPE] = ServerDeploymentType.DOCKER
        # Set the local stores path to point to where the client's local stores
        # path is mounted in the container. This ensures that the server's store
        # configuration is initialized with the same path as the client.
        env[ENV_ZENML_LOCAL_STORES_PATH] = os.path.join(
            SERVICE_CONTAINER_GLOBAL_CONFIG_PATH,
            LOCAL_STORES_DIRECTORY_NAME,
        )
        env[ENV_ZENML_DISABLE_DATABASE_MIGRATION] = "True"

        return cmd, env

    def provision(self) -> None:
        """Provision the service."""
        self._copy_global_configuration()
        super().provision()

    def run(self) -> None:
        """Run the ZenML Server.

        Raises:
            ValueError: if started with a global configuration that connects to
                another ZenML server.
        """
        import uvicorn  # type: ignore[import]

        gc = GlobalConfiguration()
        if gc.store and gc.store.type == StoreType.REST:
            raise ValueError(
                "The ZenML server cannot be started with REST store type."
            )
        logger.info(
            "Starting ZenML Server as blocking "
            "process... press CTRL+C once to stop it."
        )

        self.endpoint.prepare_for_start()

        try:
            uvicorn.run(
                ZEN_SERVER_ENTRYPOINT,
                host="0.0.0.0",
                port=self.endpoint.config.port,
                log_level="info",
            )
        except KeyboardInterrupt:
            logger.info("ZenML Server stopped. Resuming normal execution.")
config_path() classmethod

Path to the directory where the docker ZenML server files are located.

Returns:

Type Description
str

Path to the docker ZenML server runtime directory.

Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
@classmethod
def config_path(cls) -> str:
    """Path to the directory where the docker ZenML server files are located.

    Returns:
        Path to the docker ZenML server runtime directory.
    """
    return os.path.join(
        get_global_config_directory(),
        "zen_server",
        "docker",
    )
get_service() classmethod

Load and return the docker ZenML server service, if present.

Returns:

Type Description
Optional[DockerZenServer]

The docker ZenML server service or None, if the docker server deployment is not found.

Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
@classmethod
def get_service(cls) -> Optional["DockerZenServer"]:
    """Load and return the docker ZenML server service, if present.

    Returns:
        The docker ZenML server service or None, if the docker server
        deployment is not found.
    """
    from zenml.services import ServiceRegistry

    config_filename = os.path.join(cls.config_path(), "service.json")
    try:
        with open(config_filename, "r") as f:
            return cast(
                DockerZenServer,
                ServiceRegistry().load_service_from_json(f.read()),
            )
    except FileNotFoundError:
        return None
provision(self)

Provision the service.

Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
def provision(self) -> None:
    """Provision the service."""
    self._copy_global_configuration()
    super().provision()
run(self)

Run the ZenML Server.

Exceptions:

Type Description
ValueError

if started with a global configuration that connects to another ZenML server.

Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
def run(self) -> None:
    """Run the ZenML Server.

    Raises:
        ValueError: if started with a global configuration that connects to
            another ZenML server.
    """
    import uvicorn  # type: ignore[import]

    gc = GlobalConfiguration()
    if gc.store and gc.store.type == StoreType.REST:
        raise ValueError(
            "The ZenML server cannot be started with REST store type."
        )
    logger.info(
        "Starting ZenML Server as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()

    try:
        uvicorn.run(
            ZEN_SERVER_ENTRYPOINT,
            host="0.0.0.0",
            port=self.endpoint.config.port,
            log_level="info",
        )
    except KeyboardInterrupt:
        logger.info("ZenML Server stopped. Resuming normal execution.")
DockerZenServerConfig (ContainerServiceConfig) pydantic-model

Docker Zen server configuration.

Attributes:

Name Type Description
server DockerServerDeploymentConfig

The deployment configuration.

Source code in zenml/zen_server/deploy/docker/docker_zen_server.py
class DockerZenServerConfig(ContainerServiceConfig):
    """Docker Zen server configuration.

    Attributes:
        server: The deployment configuration.
    """

    server: DockerServerDeploymentConfig

exceptions

ZenML server deployment exceptions.

ServerDeploymentConfigurationError (ServerDeploymentError)

Raised when there is a ZenML server deployment configuration error .

Source code in zenml/zen_server/deploy/exceptions.py
class ServerDeploymentConfigurationError(ServerDeploymentError):
    """Raised when there is a ZenML server deployment configuration error ."""
ServerDeploymentError (ZenMLBaseException)

Base exception class for all ZenML server deployment related errors.

Source code in zenml/zen_server/deploy/exceptions.py
class ServerDeploymentError(ZenMLBaseException):
    """Base exception class for all ZenML server deployment related errors."""
ServerDeploymentExistsError (ServerDeploymentError)

Raised when trying to deploy a new ZenML server with the same name.

Source code in zenml/zen_server/deploy/exceptions.py
class ServerDeploymentExistsError(ServerDeploymentError):
    """Raised when trying to deploy a new ZenML server with the same name."""
ServerDeploymentNotFoundError (ServerDeploymentError)

Raised when trying to fetch a ZenML server deployment that doesn't exist.

Source code in zenml/zen_server/deploy/exceptions.py
class ServerDeploymentNotFoundError(ServerDeploymentError):
    """Raised when trying to fetch a ZenML server deployment that doesn't exist."""
ServerProviderNotFoundError (ServerDeploymentError)

Raised when using a ZenML server provider that doesn't exist.

Source code in zenml/zen_server/deploy/exceptions.py
class ServerProviderNotFoundError(ServerDeploymentError):
    """Raised when using a ZenML server provider that doesn't exist."""

local special

ZenML Server Local Deployment.

local_provider

Zen Server local provider implementation.

LocalServerProvider (BaseServerProvider)

Local ZenML server provider.

Source code in zenml/zen_server/deploy/local/local_provider.py
class LocalServerProvider(BaseServerProvider):
    """Local ZenML server provider."""

    TYPE: ClassVar[ServerProviderType] = ServerProviderType.LOCAL
    CONFIG_TYPE: ClassVar[
        Type[ServerDeploymentConfig]
    ] = LocalServerDeploymentConfig

    @staticmethod
    def check_local_server_dependencies() -> None:
        """Check if local server dependencies are installed.

        Raises:
            RuntimeError: If the dependencies are not installed.
        """
        try:
            # Make sure the ZenML Server dependencies are installed
            import fastapi  # noqa
            import uvicorn  # type: ignore[import] # noqa
        except ImportError:
            # Unable to import the ZenML Server dependencies.
            raise RuntimeError(
                "The local ZenML server provider is unavailable because the "
                "ZenML server requirements seems to be unavailable on your machine. "
                "This is probably because ZenML was installed without the optional "
                "ZenML Server dependencies. To install the missing dependencies "
                f'run `pip install "zenml[server]=={__version__}`".'
            )

    @classmethod
    def _get_service_configuration(
        cls,
        server_config: ServerDeploymentConfig,
    ) -> Tuple[
        ServiceConfig,
        ServiceEndpointConfig,
        ServiceEndpointHealthMonitorConfig,
    ]:
        """Construct the service configuration from a server deployment configuration.

        Args:
            server_config: server deployment configuration.

        Returns:
            The service, service endpoint and endpoint monitor configuration.
        """
        assert isinstance(server_config, LocalServerDeploymentConfig)

        return (
            LocalZenServerConfig(
                root_runtime_path=LocalZenServer.config_path(),
                singleton=True,
                name=server_config.name,
                blocking=server_config.blocking,
                server=server_config,
            ),
            LocalDaemonServiceEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                ip_address=str(server_config.ip_address),
                port=server_config.port,
                allocate_port=False,
            ),
            HTTPEndpointHealthMonitorConfig(
                healthcheck_uri_path=ZEN_SERVER_HEALTHCHECK_URL_PATH,
                use_head_request=True,
            ),
        )

    def _create_service(
        self,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Create, start and return the local ZenML server deployment service.

        Args:
            config: The server deployment configuration.
            timeout: The timeout in seconds to wait until the service is
                running.

        Returns:
            The service instance.

        Raises:
            RuntimeError: If a local service is already running.
        """
        assert isinstance(config, LocalServerDeploymentConfig)

        if timeout is None:
            timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT

        self.check_local_server_dependencies()
        existing_service = LocalZenServer.get_service()
        if existing_service:
            raise RuntimeError(
                f"A local ZenML server with name '{existing_service.config.name}' "
                f"is already running. Please stop it first before starting a "
                f"new one."
            )

        (
            service_config,
            endpoint_cfg,
            monitor_cfg,
        ) = self._get_service_configuration(config)
        endpoint = LocalDaemonServiceEndpoint(
            config=endpoint_cfg,
            monitor=HTTPEndpointHealthMonitor(
                config=monitor_cfg,
            ),
        )
        service = LocalZenServer(config=service_config, endpoint=endpoint)
        service.start(timeout=timeout)
        return service

    def _update_service(
        self,
        service: BaseService,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Update the local ZenML server deployment service.

        Args:
            service: The service instance.
            config: The new server deployment configuration.
            timeout: The timeout in seconds to wait until the updated service is
                running.

        Returns:
            The updated service instance.
        """
        if timeout is None:
            timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT

        (
            new_config,
            new_endpoint_cfg,
            new_monitor_cfg,
        ) = self._get_service_configuration(config)

        assert service.endpoint
        assert service.endpoint.monitor
        service.stop(timeout=timeout)
        (
            service.config,
            service.endpoint.config,
            service.endpoint.monitor.config,
        ) = (
            new_config,
            new_endpoint_cfg,
            new_monitor_cfg,
        )
        service.start(timeout=timeout)

        return service

    def _start_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Start the local ZenML server deployment service.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                running.

        Returns:
            The updated service instance.
        """
        if timeout is None:
            timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT

        service.start(timeout=timeout)
        return service

    def _stop_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Stop the local ZenML server deployment service.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                stopped.

        Returns:
            The updated service instance.
        """
        if timeout is None:
            timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT

        service.stop(timeout=timeout)
        return service

    def _delete_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> None:
        """Remove the local ZenML server deployment service.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                removed.
        """
        assert isinstance(service, LocalZenServer)

        if timeout is None:
            timeout = LOCAL_ZENML_SERVER_DEFAULT_TIMEOUT

        service.stop(timeout)
        shutil.rmtree(LocalZenServer.config_path())

    def _get_service(self, server_name: str) -> BaseService:
        """Get the local ZenML server deployment service.

        Args:
            server_name: The server deployment name.

        Returns:
            The service instance.

        Raises:
            KeyError: If the server deployment is not found.
        """
        service = LocalZenServer.get_service()
        if service is None:
            raise KeyError("The local ZenML server is not deployed.")

        if service.config.name != server_name:
            raise KeyError(
                "The local ZenML server is deployed but with a different name."
            )

        return service

    def _list_services(self) -> List[BaseService]:
        """Get all service instances for all deployed ZenML servers.

        Returns:
            A list of service instances.
        """
        service = LocalZenServer.get_service()
        if service:
            return [service]
        return []

    def _get_deployment_config(
        self, service: BaseService
    ) -> ServerDeploymentConfig:
        """Recreate the server deployment configuration from a service instance.

        Args:
            service: The service instance.

        Returns:
            The server deployment configuration.
        """
        server = cast(LocalZenServer, service)
        return server.config.server
CONFIG_TYPE (ServerDeploymentConfig) pydantic-model

Local server deployment configuration.

Attributes:

Name Type Description
port int

The TCP port number where the server is accepting connections.

address

The IP address where the server is reachable.

blocking bool

Run the server in blocking mode instead of using a daemon process.

Source code in zenml/zen_server/deploy/local/local_provider.py
class LocalServerDeploymentConfig(ServerDeploymentConfig):
    """Local server deployment configuration.

    Attributes:
        port: The TCP port number where the server is accepting connections.
        address: The IP address where the server is reachable.
        blocking: Run the server in blocking mode instead of using a daemon
            process.
    """

    port: int = 8237
    ip_address: Union[
        ipaddress.IPv4Address, ipaddress.IPv6Address
    ] = ipaddress.IPv4Address(DEFAULT_LOCAL_SERVICE_IP_ADDRESS)
    blocking: bool = False
    store: Optional[StoreConfiguration] = None

    class Config:
        """Pydantic configuration."""

        extra = "forbid"
Config

Pydantic configuration.

Source code in zenml/zen_server/deploy/local/local_provider.py
class Config:
    """Pydantic configuration."""

    extra = "forbid"
check_local_server_dependencies() staticmethod

Check if local server dependencies are installed.

Exceptions:

Type Description
RuntimeError

If the dependencies are not installed.

Source code in zenml/zen_server/deploy/local/local_provider.py
@staticmethod
def check_local_server_dependencies() -> None:
    """Check if local server dependencies are installed.

    Raises:
        RuntimeError: If the dependencies are not installed.
    """
    try:
        # Make sure the ZenML Server dependencies are installed
        import fastapi  # noqa
        import uvicorn  # type: ignore[import] # noqa
    except ImportError:
        # Unable to import the ZenML Server dependencies.
        raise RuntimeError(
            "The local ZenML server provider is unavailable because the "
            "ZenML server requirements seems to be unavailable on your machine. "
            "This is probably because ZenML was installed without the optional "
            "ZenML Server dependencies. To install the missing dependencies "
            f'run `pip install "zenml[server]=={__version__}`".'
        )
local_zen_server

Local ZenML server deployment service implementation.

LocalServerDeploymentConfig (ServerDeploymentConfig) pydantic-model

Local server deployment configuration.

Attributes:

Name Type Description
port int

The TCP port number where the server is accepting connections.

address

The IP address where the server is reachable.

blocking bool

Run the server in blocking mode instead of using a daemon process.

Source code in zenml/zen_server/deploy/local/local_zen_server.py
class LocalServerDeploymentConfig(ServerDeploymentConfig):
    """Local server deployment configuration.

    Attributes:
        port: The TCP port number where the server is accepting connections.
        address: The IP address where the server is reachable.
        blocking: Run the server in blocking mode instead of using a daemon
            process.
    """

    port: int = 8237
    ip_address: Union[
        ipaddress.IPv4Address, ipaddress.IPv6Address
    ] = ipaddress.IPv4Address(DEFAULT_LOCAL_SERVICE_IP_ADDRESS)
    blocking: bool = False
    store: Optional[StoreConfiguration] = None

    class Config:
        """Pydantic configuration."""

        extra = "forbid"
Config

Pydantic configuration.

Source code in zenml/zen_server/deploy/local/local_zen_server.py
class Config:
    """Pydantic configuration."""

    extra = "forbid"
LocalZenServer (LocalDaemonService) pydantic-model

Service daemon that can be used to start a local ZenML Server.

Attributes:

Name Type Description
config LocalZenServerConfig

service configuration

endpoint LocalDaemonServiceEndpoint

optional service endpoint

Source code in zenml/zen_server/deploy/local/local_zen_server.py
class LocalZenServer(LocalDaemonService):
    """Service daemon that can be used to start a local ZenML Server.

    Attributes:
        config: service configuration
        endpoint: optional service endpoint
    """

    SERVICE_TYPE = ServiceType(
        name="local_zenml_server",
        type="zen_server",
        flavor="local",
        description="Local ZenML server deployment",
    )

    config: LocalZenServerConfig
    endpoint: LocalDaemonServiceEndpoint

    @classmethod
    def config_path(cls) -> str:
        """Path to the directory where the local ZenML server files are located.

        Returns:
            Path to the local ZenML server runtime directory.
        """
        return os.path.join(
            get_global_config_directory(),
            "zen_server",
            "local",
        )

    @property
    def _global_config_path(self) -> str:
        """Path to the global configuration directory used by this server.

        Returns:
            Path to the global configuration directory used by this server.
        """
        return os.path.join(self.config_path(), ".zenconfig")

    def _copy_global_configuration(self) -> None:
        """Copy the global configuration to the local ZenML server location.

        The local ZenML server global configuration is a copy of the local
        global configuration. If a store configuration is explicitly set in
        the server configuration, it will be used. Otherwise, the store
        configuration is set to point to the local store.
        """
        gc = GlobalConfiguration()

        # this creates a copy of the global configuration and saves it to
        # the server configuration path. The store is set to point to the local
        # default database unless a custom store configuration is explicitly
        # supplied with the server configuration.
        gc.copy_configuration(
            config_path=self._global_config_path,
            store_config=self.config.server.store,
            empty_store=self.config.server.store is None,
        )

    @classmethod
    def get_service(cls) -> Optional["LocalZenServer"]:
        """Load and return the local ZenML server service, if present.

        Returns:
            The local ZenML server service or None, if the local server
            deployment is not found.
        """
        from zenml.services import ServiceRegistry

        config_filename = os.path.join(cls.config_path(), "service.json")
        try:
            with open(config_filename, "r") as f:
                return cast(
                    LocalZenServer,
                    ServiceRegistry().load_service_from_json(f.read()),
                )
        except FileNotFoundError:
            return None

    def _get_daemon_cmd(self) -> Tuple[List[str], Dict[str, str]]:
        """Get the command to start the daemon.

        Overrides the base class implementation to add the environment variable
        that forces the ZenML server to use the copied global config.

        Returns:
            The command to start the daemon and the environment variables to
            set for the command.
        """
        cmd, env = super()._get_daemon_cmd()
        env[ENV_ZENML_CONFIG_PATH] = self._global_config_path
        env[ENV_ZENML_SERVER_DEPLOYMENT_TYPE] = ServerDeploymentType.LOCAL
        # Set the local stores path to the same path used by the client. This
        # ensures that the server's store configuration is initialized with
        # the same path as the client.
        env[
            ENV_ZENML_LOCAL_STORES_PATH
        ] = GlobalConfiguration().local_stores_path
        env[ENV_ZENML_DISABLE_DATABASE_MIGRATION] = "True"

        return cmd, env

    def provision(self) -> None:
        """Provision the service."""
        self._copy_global_configuration()
        super().provision()

    def start(self, timeout: int = 0) -> None:
        """Start the service and optionally wait for it to become active.

        Args:
            timeout: amount of time to wait for the service to become active.
                If set to 0, the method will return immediately after checking
                the service status.
        """
        if not self.config.blocking:
            super().start(timeout)
        else:
            self._copy_global_configuration()
            local_stores_path = GlobalConfiguration().local_stores_path
            GlobalConfiguration._reset_instance()
            Client._reset_instance()
            config_path = os.environ.get(ENV_ZENML_CONFIG_PATH)
            os.environ[ENV_ZENML_CONFIG_PATH] = self._global_config_path
            os.environ[ENV_ZENML_LOCAL_STORES_PATH] = local_stores_path
            try:
                self.run()
            finally:
                if config_path:
                    os.environ[ENV_ZENML_CONFIG_PATH] = config_path
                else:
                    del os.environ[ENV_ZENML_CONFIG_PATH]
                del os.environ[ENV_ZENML_LOCAL_STORES_PATH]
                GlobalConfiguration._reset_instance()
                Client._reset_instance()

    def run(self) -> None:
        """Run the ZenML Server.

        Raises:
            ValueError: if started with a global configuration that connects to
                another ZenML server.
        """
        import uvicorn  # type: ignore[import]

        gc = GlobalConfiguration()
        if gc.store and gc.store.type == StoreType.REST:
            raise ValueError(
                "The ZenML server cannot be started with REST store type."
            )
        logger.info(
            "Starting ZenML Server as blocking "
            "process... press CTRL+C once to stop it."
        )

        self.endpoint.prepare_for_start()

        try:
            uvicorn.run(
                ZEN_SERVER_ENTRYPOINT,
                host=self.endpoint.config.ip_address,
                port=self.endpoint.config.port,
                log_level="info",
            )
        except KeyboardInterrupt:
            logger.info("ZenML Server stopped. Resuming normal execution.")
config_path() classmethod

Path to the directory where the local ZenML server files are located.

Returns:

Type Description
str

Path to the local ZenML server runtime directory.

Source code in zenml/zen_server/deploy/local/local_zen_server.py
@classmethod
def config_path(cls) -> str:
    """Path to the directory where the local ZenML server files are located.

    Returns:
        Path to the local ZenML server runtime directory.
    """
    return os.path.join(
        get_global_config_directory(),
        "zen_server",
        "local",
    )
get_service() classmethod

Load and return the local ZenML server service, if present.

Returns:

Type Description
Optional[LocalZenServer]

The local ZenML server service or None, if the local server deployment is not found.

Source code in zenml/zen_server/deploy/local/local_zen_server.py
@classmethod
def get_service(cls) -> Optional["LocalZenServer"]:
    """Load and return the local ZenML server service, if present.

    Returns:
        The local ZenML server service or None, if the local server
        deployment is not found.
    """
    from zenml.services import ServiceRegistry

    config_filename = os.path.join(cls.config_path(), "service.json")
    try:
        with open(config_filename, "r") as f:
            return cast(
                LocalZenServer,
                ServiceRegistry().load_service_from_json(f.read()),
            )
    except FileNotFoundError:
        return None
provision(self)

Provision the service.

Source code in zenml/zen_server/deploy/local/local_zen_server.py
def provision(self) -> None:
    """Provision the service."""
    self._copy_global_configuration()
    super().provision()
run(self)

Run the ZenML Server.

Exceptions:

Type Description
ValueError

if started with a global configuration that connects to another ZenML server.

Source code in zenml/zen_server/deploy/local/local_zen_server.py
def run(self) -> None:
    """Run the ZenML Server.

    Raises:
        ValueError: if started with a global configuration that connects to
            another ZenML server.
    """
    import uvicorn  # type: ignore[import]

    gc = GlobalConfiguration()
    if gc.store and gc.store.type == StoreType.REST:
        raise ValueError(
            "The ZenML server cannot be started with REST store type."
        )
    logger.info(
        "Starting ZenML Server as blocking "
        "process... press CTRL+C once to stop it."
    )

    self.endpoint.prepare_for_start()

    try:
        uvicorn.run(
            ZEN_SERVER_ENTRYPOINT,
            host=self.endpoint.config.ip_address,
            port=self.endpoint.config.port,
            log_level="info",
        )
    except KeyboardInterrupt:
        logger.info("ZenML Server stopped. Resuming normal execution.")
start(self, timeout=0)

Start the service and optionally wait for it to become active.

Parameters:

Name Type Description Default
timeout int

amount of time to wait for the service to become active. If set to 0, the method will return immediately after checking the service status.

0
Source code in zenml/zen_server/deploy/local/local_zen_server.py
def start(self, timeout: int = 0) -> None:
    """Start the service and optionally wait for it to become active.

    Args:
        timeout: amount of time to wait for the service to become active.
            If set to 0, the method will return immediately after checking
            the service status.
    """
    if not self.config.blocking:
        super().start(timeout)
    else:
        self._copy_global_configuration()
        local_stores_path = GlobalConfiguration().local_stores_path
        GlobalConfiguration._reset_instance()
        Client._reset_instance()
        config_path = os.environ.get(ENV_ZENML_CONFIG_PATH)
        os.environ[ENV_ZENML_CONFIG_PATH] = self._global_config_path
        os.environ[ENV_ZENML_LOCAL_STORES_PATH] = local_stores_path
        try:
            self.run()
        finally:
            if config_path:
                os.environ[ENV_ZENML_CONFIG_PATH] = config_path
            else:
                del os.environ[ENV_ZENML_CONFIG_PATH]
            del os.environ[ENV_ZENML_LOCAL_STORES_PATH]
            GlobalConfiguration._reset_instance()
            Client._reset_instance()
LocalZenServerConfig (LocalDaemonServiceConfig) pydantic-model

Local Zen server configuration.

Attributes:

Name Type Description
server LocalServerDeploymentConfig

The deployment configuration.

Source code in zenml/zen_server/deploy/local/local_zen_server.py
class LocalZenServerConfig(LocalDaemonServiceConfig):
    """Local Zen server configuration.

    Attributes:
        server: The deployment configuration.
    """

    server: LocalServerDeploymentConfig

terraform special

ZenML Server Terraform Deployment.

providers special

ZenML Server Terraform Providers.

aws_provider

Zen Server AWS Terraform deployer implementation.

AWSServerDeploymentConfig (TerraformServerDeploymentConfig) pydantic-model

AWS server deployment configuration.

Attributes:

Name Type Description
region str

The AWS region to deploy to.

rds_name str

The name of the RDS instance to create

db_name str

Name of RDS database to create.

db_type str

Type of RDS database to create.

db_version str

Version of RDS database to create.

db_instance_class str

Instance class of RDS database to create.

db_allocated_storage int

Allocated storage of RDS database to create.

Source code in zenml/zen_server/deploy/terraform/providers/aws_provider.py
class AWSServerDeploymentConfig(TerraformServerDeploymentConfig):
    """AWS server deployment configuration.

    Attributes:
        region: The AWS region to deploy to.
        rds_name: The name of the RDS instance to create
        db_name: Name of RDS database to create.
        db_type: Type of RDS database to create.
        db_version: Version of RDS database to create.
        db_instance_class: Instance class of RDS database to create.
        db_allocated_storage: Allocated storage of RDS database to create.
    """

    region: str = "eu-west-1"
    rds_name: str = "zenmlserver"
    db_name: str = "zenmlserver"
    db_type: str = "mysql"
    db_version: str = "5.7.38"
    db_instance_class: str = "db.t3.micro"
    db_allocated_storage: int = 5
AWSServerProvider (TerraformServerProvider)

AWS ZenML server provider.

Source code in zenml/zen_server/deploy/terraform/providers/aws_provider.py
class AWSServerProvider(TerraformServerProvider):
    """AWS ZenML server provider."""

    TYPE: ClassVar[ServerProviderType] = ServerProviderType.AWS
    CONFIG_TYPE: ClassVar[
        Type[TerraformServerDeploymentConfig]
    ] = AWSServerDeploymentConfig
CONFIG_TYPE (TerraformServerDeploymentConfig) pydantic-model

AWS server deployment configuration.

Attributes:

Name Type Description
region str

The AWS region to deploy to.

rds_name str

The name of the RDS instance to create

db_name str

Name of RDS database to create.

db_type str

Type of RDS database to create.

db_version str

Version of RDS database to create.

db_instance_class str

Instance class of RDS database to create.

db_allocated_storage int

Allocated storage of RDS database to create.

Source code in zenml/zen_server/deploy/terraform/providers/aws_provider.py
class AWSServerDeploymentConfig(TerraformServerDeploymentConfig):
    """AWS server deployment configuration.

    Attributes:
        region: The AWS region to deploy to.
        rds_name: The name of the RDS instance to create
        db_name: Name of RDS database to create.
        db_type: Type of RDS database to create.
        db_version: Version of RDS database to create.
        db_instance_class: Instance class of RDS database to create.
        db_allocated_storage: Allocated storage of RDS database to create.
    """

    region: str = "eu-west-1"
    rds_name: str = "zenmlserver"
    db_name: str = "zenmlserver"
    db_type: str = "mysql"
    db_version: str = "5.7.38"
    db_instance_class: str = "db.t3.micro"
    db_allocated_storage: int = 5
azure_provider

Zen Server Azure Terraform deployer implementation.

AzureServerDeploymentConfig (TerraformServerDeploymentConfig) pydantic-model

Azure server deployment configuration.

Attributes:

Name Type Description
resource_group str

The Azure resource_group to deploy to.

db_instance_name str

The name of the Flexible MySQL instance to create

db_name str

Name of RDS database to create.

db_version str

Version of MySQL database to create.

db_sku_name str

The sku_name for the database resource.

db_disk_size int

Allocated storage of MySQL database to create.

Source code in zenml/zen_server/deploy/terraform/providers/azure_provider.py
class AzureServerDeploymentConfig(TerraformServerDeploymentConfig):
    """Azure server deployment configuration.

    Attributes:
        resource_group: The Azure resource_group to deploy to.
        db_instance_name: The name of the Flexible MySQL instance to create
        db_name: Name of RDS database to create.
        db_version: Version of MySQL database to create.
        db_sku_name: The sku_name for the database resource.
        db_disk_size: Allocated storage of MySQL database to create.
    """

    resource_group: str = "zenml"
    db_instance_name: str = "zenmlserver"
    db_name: str = "zenmlserver"
    db_version: str = "5.7"
    db_sku_name: str = "B_Standard_B1s"
    db_disk_size: int = 20
AzureServerProvider (TerraformServerProvider)

Azure ZenML server provider.

Source code in zenml/zen_server/deploy/terraform/providers/azure_provider.py
class AzureServerProvider(TerraformServerProvider):
    """Azure ZenML server provider."""

    TYPE: ClassVar[ServerProviderType] = ServerProviderType.AZURE
    CONFIG_TYPE: ClassVar[
        Type[TerraformServerDeploymentConfig]
    ] = AzureServerDeploymentConfig
CONFIG_TYPE (TerraformServerDeploymentConfig) pydantic-model

Azure server deployment configuration.

Attributes:

Name Type Description
resource_group str

The Azure resource_group to deploy to.

db_instance_name str

The name of the Flexible MySQL instance to create

db_name str

Name of RDS database to create.

db_version str

Version of MySQL database to create.

db_sku_name str

The sku_name for the database resource.

db_disk_size int

Allocated storage of MySQL database to create.

Source code in zenml/zen_server/deploy/terraform/providers/azure_provider.py
class AzureServerDeploymentConfig(TerraformServerDeploymentConfig):
    """Azure server deployment configuration.

    Attributes:
        resource_group: The Azure resource_group to deploy to.
        db_instance_name: The name of the Flexible MySQL instance to create
        db_name: Name of RDS database to create.
        db_version: Version of MySQL database to create.
        db_sku_name: The sku_name for the database resource.
        db_disk_size: Allocated storage of MySQL database to create.
    """

    resource_group: str = "zenml"
    db_instance_name: str = "zenmlserver"
    db_name: str = "zenmlserver"
    db_version: str = "5.7"
    db_sku_name: str = "B_Standard_B1s"
    db_disk_size: int = 20
gcp_provider

Zen Server GCP Terraform deployer implementation.

GCPServerDeploymentConfig (TerraformServerDeploymentConfig) pydantic-model

GCP server deployment configuration.

Attributes:

Name Type Description
project_id str

The project in GCP to deploy the server to.

region str

The GCP region to deploy to.

cloudsql_name str

The name of the CloudSQL instance to create

db_name str

Name of CloudSQL database to create.

db_instance_tier str

Instance class of CloudSQL database to create.

db_disk_size int

Allocated storage of CloudSQL database to create.

Source code in zenml/zen_server/deploy/terraform/providers/gcp_provider.py
class GCPServerDeploymentConfig(TerraformServerDeploymentConfig):
    """GCP server deployment configuration.

    Attributes:
        project_id: The project in GCP to deploy the server to.
        region: The GCP region to deploy to.
        cloudsql_name: The name of the CloudSQL instance to create
        db_name: Name of CloudSQL database to create.
        db_instance_tier: Instance class of CloudSQL database to create.
        db_disk_size: Allocated storage of CloudSQL database to create.
    """

    project_id: str
    region: str = "europe-west3"
    cloudsql_name: str = "zenmlserver"
    db_name: str = "zenmlserver"
    db_instance_tier: str = "db-n1-standard-1"
    db_disk_size: int = 10
GCPServerProvider (TerraformServerProvider)

GCP ZenML server provider.

Source code in zenml/zen_server/deploy/terraform/providers/gcp_provider.py
class GCPServerProvider(TerraformServerProvider):
    """GCP ZenML server provider."""

    TYPE: ClassVar[ServerProviderType] = ServerProviderType.GCP
    CONFIG_TYPE: ClassVar[
        Type[TerraformServerDeploymentConfig]
    ] = GCPServerDeploymentConfig
CONFIG_TYPE (TerraformServerDeploymentConfig) pydantic-model

GCP server deployment configuration.

Attributes:

Name Type Description
project_id str

The project in GCP to deploy the server to.

region str

The GCP region to deploy to.

cloudsql_name str

The name of the CloudSQL instance to create

db_name str

Name of CloudSQL database to create.

db_instance_tier str

Instance class of CloudSQL database to create.

db_disk_size int

Allocated storage of CloudSQL database to create.

Source code in zenml/zen_server/deploy/terraform/providers/gcp_provider.py
class GCPServerDeploymentConfig(TerraformServerDeploymentConfig):
    """GCP server deployment configuration.

    Attributes:
        project_id: The project in GCP to deploy the server to.
        region: The GCP region to deploy to.
        cloudsql_name: The name of the CloudSQL instance to create
        db_name: Name of CloudSQL database to create.
        db_instance_tier: Instance class of CloudSQL database to create.
        db_disk_size: Allocated storage of CloudSQL database to create.
    """

    project_id: str
    region: str = "europe-west3"
    cloudsql_name: str = "zenmlserver"
    db_name: str = "zenmlserver"
    db_instance_tier: str = "db-n1-standard-1"
    db_disk_size: int = 10
terraform_provider

Zen Server terraform deployer implementation.

TerraformServerProvider (BaseServerProvider)

Terraform ZenML server provider.

Source code in zenml/zen_server/deploy/terraform/providers/terraform_provider.py
class TerraformServerProvider(BaseServerProvider):
    """Terraform ZenML server provider."""

    CONFIG_TYPE: ClassVar[
        Type[ServerDeploymentConfig]
    ] = TerraformServerDeploymentConfig

    @staticmethod
    def _get_server_recipe_root_path() -> str:
        """Get the server recipe root path.

        The Terraform recipe files for all terraform server providers are
        located in a folder relative to the `zenml.zen_server.deploy.terraform`
        Python module.

        Returns:
            The server recipe root path.
        """
        import zenml.zen_server.deploy.terraform as terraform_module

        root_path = os.path.join(
            os.path.dirname(terraform_module.__file__),
            TERRAFORM_ZENML_SERVER_RECIPE_SUBPATH,
        )
        return root_path

    @classmethod
    def _get_service_configuration(
        cls,
        server_config: ServerDeploymentConfig,
    ) -> Tuple[
        ServiceConfig,
        ServiceEndpointConfig,
        ServiceEndpointHealthMonitorConfig,
    ]:
        """Construct the service configuration from a server deployment configuration.

        Args:
            server_config: server deployment configuration.

        Returns:
            The service configuration.
        """
        assert isinstance(server_config, TerraformServerDeploymentConfig)

        return (
            TerraformZenServerConfig(
                name=server_config.name,
                root_runtime_path=TERRAFORM_ZENML_SERVER_CONFIG_PATH,
                singleton=True,
                directory_path=os.path.join(
                    cls._get_server_recipe_root_path(),
                    server_config.provider,
                ),
                log_level=server_config.log_level,
                variables_file_path=TERRAFORM_VALUES_FILE_PATH,
                server=server_config,
            ),
            ServiceEndpointConfig(
                protocol=ServiceEndpointProtocol.HTTP,
                allocate_port=False,
            ),
            HTTPEndpointHealthMonitorConfig(
                healthcheck_uri_path=ZEN_SERVER_HEALTHCHECK_URL_PATH,
                use_head_request=True,
            ),
        )

    def _create_service(
        self,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Create, start and return the terraform ZenML server deployment service.

        Args:
            config: The server deployment configuration.
            timeout: The timeout in seconds to wait until the service is
                running.

        Returns:
            The service instance.

        Raises:
            RuntimeError: If a terraform service is already running.
        """
        assert isinstance(config, TerraformServerDeploymentConfig)

        if timeout is None:
            timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT

        existing_service = TerraformZenServer.get_service()
        if existing_service:
            raise RuntimeError(
                f"A terraform ZenML server with name '{existing_service.config.name}' "
                f"is already running. Please stop it first before starting a "
                f"new one."
            )

        (
            service_config,
            endpoint_cfg,
            monitor_cfg,
        ) = self._get_service_configuration(config)

        service = TerraformZenServer(config=service_config)

        service.start(timeout=timeout)
        return service

    def _update_service(
        self,
        service: BaseService,
        config: ServerDeploymentConfig,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Update the terraform ZenML server deployment service.

        Args:
            service: The service instance.
            config: The new server deployment configuration.
            timeout: The timeout in seconds to wait until the updated service is
                running.

        Returns:
            The updated service instance.
        """
        if timeout is None:
            timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT

        (
            new_config,
            endpoint_cfg,
            monitor_cfg,
        ) = self._get_service_configuration(config)

        assert isinstance(new_config, TerraformZenServerConfig)
        assert isinstance(service, TerraformZenServer)

        # preserve the server ID across updates
        new_config.server.server_id = service.config.server.server_id
        service.config = new_config
        service.start(timeout=timeout)

        return service

    def _start_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Start the terraform ZenML server deployment service.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                running.

        Returns:
            The updated service instance.
        """
        if timeout is None:
            timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT

        service.start(timeout=timeout)
        return service

    def _stop_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> BaseService:
        """Stop the terraform ZenML server deployment service.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                stopped.

        Returns:
            The updated service instance.
        """
        if timeout is None:
            timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT

        service.stop(timeout=timeout)
        return service

    def _delete_service(
        self,
        service: BaseService,
        timeout: Optional[int] = None,
    ) -> None:
        """Remove the terraform ZenML server deployment service.

        Args:
            service: The service instance.
            timeout: The timeout in seconds to wait until the service is
                removed.
        """
        assert isinstance(service, TerraformZenServer)

        if timeout is None:
            timeout = TERRAFORM_ZENML_SERVER_DEFAULT_TIMEOUT

        service.stop(timeout)

    def _get_service(self, server_name: str) -> BaseService:
        """Get the terraform ZenML server deployment service.

        Args:
            server_name: The server deployment name.

        Returns:
            The service instance.

        Raises:
            KeyError: If the server deployment is not found.
        """
        service = TerraformZenServer.get_service()
        if service is None:
            raise KeyError("The terraform ZenML server is not deployed.")

        if service.config.server.name != server_name:
            raise KeyError(
                "The terraform ZenML server is deployed but with a different name."
            )
        return service

    def _list_services(self) -> List[BaseService]:
        """Get all service instances for all deployed ZenML servers.

        Returns:
            A list of service instances.
        """
        service = TerraformZenServer.get_service()
        if service:
            return [service]
        return []

    def _get_deployment_config(
        self, service: BaseService
    ) -> ServerDeploymentConfig:
        """Recreate the server deployment configuration from a service instance.

        Args:
            service: The service instance.

        Returns:
            The server deployment configuration.
        """
        server = cast(TerraformZenServer, service)
        return server.config.server

    def _get_deployment_status(
        self, service: BaseService
    ) -> ServerDeploymentStatus:
        """Get the status of a server deployment from its service.

        Args:
            service: The server deployment service.

        Returns:
            The status of the server deployment.
        """
        gc = GlobalConfiguration()
        url: Optional[str] = None
        service = cast(TerraformZenServer, service)
        ca_crt = None
        if service.is_running:
            url = service.get_server_url()
            ca_crt = service.get_certificate()
        connected = (
            url is not None and gc.store is not None and gc.store.url == url
        )

        return ServerDeploymentStatus(
            url=url,
            status=service.status.state,
            status_message=service.status.last_error,
            connected=connected,
            ca_crt=ca_crt,
        )
CONFIG_TYPE (ServerDeploymentConfig) pydantic-model

Terraform server deployment configuration.

Attributes:

Name Type Description
log_level str

The log level to set the terraform client to. Choose one of TRACE, DEBUG, INFO, WARN or ERROR (case insensitive).

username str

The username for the default ZenML server account.

password str

The password for the default ZenML server account.

helm_chart str

The path to the ZenML server helm chart to use for deployment.

zenmlserver_image_tag str

The tag to use for the zenml server docker image.

namespace str

The Kubernetes namespace to deploy the ZenML server to.

kubectl_config_path str

The path to the kubectl config file to use for deployment.

ingress_tls bool

Whether to use TLS for the ingress.

ingress_tls_generate_certs bool

Whether to generate self-signed TLS certificates for the ingress.

ingress_tls_secret_name str

The name of the Kubernetes secret to use for the ingress.

ingress_path str

The path to use for the ingress.

create_ingress_controller bool

Whether to deploy an nginx ingress controller as part of the deployment.

ingress_controller_hostname str

The ingress controller hostname to use for the ingress self-signed certificate and to compute the ZenML server URL.

deploy_db bool

Whether to create a SQL database service as part of the recipe.

database_username str

The username for the database.

database_password str

The password for the database.

database_url str

The URL of the RDS instance to use for the ZenML server.

database_ssl_ca str

The path to the SSL CA certificate to use for the database connection.

database_ssl_cert str

The path to the client SSL certificate to use for the database connection.

database_ssl_key str

The path to the client SSL key to use for the database connection.

database_ssl_verify_server_cert bool

Whether to verify the database server SSL certificate.

Source code in zenml/zen_server/deploy/terraform/providers/terraform_provider.py
class TerraformServerDeploymentConfig(ServerDeploymentConfig):
    """Terraform server deployment configuration.

    Attributes:
        log_level: The log level to set the terraform client to. Choose one of
            TRACE, DEBUG, INFO, WARN or ERROR (case insensitive).
        username: The username for the default ZenML server account.
        password: The password for the default ZenML server account.
        helm_chart: The path to the ZenML server helm chart to use for
            deployment.
        zenmlserver_image_tag: The tag to use for the zenml server docker
            image.
        namespace: The Kubernetes namespace to deploy the ZenML server to.
        kubectl_config_path: The path to the kubectl config file to use for
            deployment.
        ingress_tls: Whether to use TLS for the ingress.
        ingress_tls_generate_certs: Whether to generate self-signed TLS
            certificates for the ingress.
        ingress_tls_secret_name: The name of the Kubernetes secret to use for
            the ingress.
        ingress_path: The path to use for the ingress.
        create_ingress_controller: Whether to deploy an nginx ingress
            controller as part of the deployment.
        ingress_controller_hostname: The ingress controller hostname to use for
            the ingress self-signed certificate and to compute the ZenML server
            URL.
        deploy_db: Whether to create a SQL database service as part of the recipe.
        database_username: The username for the database.
        database_password: The password for the database.
        database_url: The URL of the RDS instance to use for the ZenML server.
        database_ssl_ca: The path to the SSL CA certificate to use for the
            database connection.
        database_ssl_cert: The path to the client SSL certificate to use for the
            database connection.
        database_ssl_key: The path to the client SSL key to use for the
            database connection.
        database_ssl_verify_server_cert: Whether to verify the database server
            SSL certificate.
    """

    log_level: str = "ERROR"

    server_id: UUID = Field(default_factory=uuid4)
    username: str
    password: str
    helm_chart: str = get_helm_chart_path()
    zenmlserver_image_tag: str = "latest"
    zenmlinit_image_tag: str = "latest"
    namespace: str = "zenmlserver"
    kubectl_config_path: str = os.path.join(str(Path.home()), ".kube", "config")
    ingress_tls: bool = True
    ingress_tls_generate_certs: bool = True
    ingress_tls_secret_name: str = "zenml-tls-certs"
    ingress_path: str = ""
    create_ingress_controller: bool = True
    ingress_controller_hostname: str = ""
    deploy_db: bool = True
    database_username: str = "user"
    database_password: str = ""
    database_url: str = ""
    database_ssl_ca: str = ""
    database_ssl_cert: str = ""
    database_ssl_key: str = ""
    database_ssl_verify_server_cert: bool = True

    class Config:
        """Pydantic configuration."""

        extra = "allow"
Config

Pydantic configuration.

Source code in zenml/zen_server/deploy/terraform/providers/terraform_provider.py
class Config:
    """Pydantic configuration."""

    extra = "allow"
terraform_zen_server

Service implementation for the ZenML terraform server deployment.

TerraformServerDeploymentConfig (ServerDeploymentConfig) pydantic-model

Terraform server deployment configuration.

Attributes:

Name Type Description
log_level str

The log level to set the terraform client to. Choose one of TRACE, DEBUG, INFO, WARN or ERROR (case insensitive).

username str

The username for the default ZenML server account.

password str

The password for the default ZenML server account.

helm_chart str

The path to the ZenML server helm chart to use for deployment.

zenmlserver_image_tag str

The tag to use for the zenml server docker image.

namespace str

The Kubernetes namespace to deploy the ZenML server to.

kubectl_config_path str

The path to the kubectl config file to use for deployment.

ingress_tls bool

Whether to use TLS for the ingress.

ingress_tls_generate_certs bool

Whether to generate self-signed TLS certificates for the ingress.

ingress_tls_secret_name str

The name of the Kubernetes secret to use for the ingress.

ingress_path str

The path to use for the ingress.

create_ingress_controller bool

Whether to deploy an nginx ingress controller as part of the deployment.

ingress_controller_hostname str

The ingress controller hostname to use for the ingress self-signed certificate and to compute the ZenML server URL.

deploy_db bool

Whether to create a SQL database service as part of the recipe.

database_username str

The username for the database.

database_password str

The password for the database.

database_url str

The URL of the RDS instance to use for the ZenML server.

database_ssl_ca str

The path to the SSL CA certificate to use for the database connection.

database_ssl_cert str

The path to the client SSL certificate to use for the database connection.

database_ssl_key str

The path to the client SSL key to use for the database connection.

database_ssl_verify_server_cert bool

Whether to verify the database server SSL certificate.

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
class TerraformServerDeploymentConfig(ServerDeploymentConfig):
    """Terraform server deployment configuration.

    Attributes:
        log_level: The log level to set the terraform client to. Choose one of
            TRACE, DEBUG, INFO, WARN or ERROR (case insensitive).
        username: The username for the default ZenML server account.
        password: The password for the default ZenML server account.
        helm_chart: The path to the ZenML server helm chart to use for
            deployment.
        zenmlserver_image_tag: The tag to use for the zenml server docker
            image.
        namespace: The Kubernetes namespace to deploy the ZenML server to.
        kubectl_config_path: The path to the kubectl config file to use for
            deployment.
        ingress_tls: Whether to use TLS for the ingress.
        ingress_tls_generate_certs: Whether to generate self-signed TLS
            certificates for the ingress.
        ingress_tls_secret_name: The name of the Kubernetes secret to use for
            the ingress.
        ingress_path: The path to use for the ingress.
        create_ingress_controller: Whether to deploy an nginx ingress
            controller as part of the deployment.
        ingress_controller_hostname: The ingress controller hostname to use for
            the ingress self-signed certificate and to compute the ZenML server
            URL.
        deploy_db: Whether to create a SQL database service as part of the recipe.
        database_username: The username for the database.
        database_password: The password for the database.
        database_url: The URL of the RDS instance to use for the ZenML server.
        database_ssl_ca: The path to the SSL CA certificate to use for the
            database connection.
        database_ssl_cert: The path to the client SSL certificate to use for the
            database connection.
        database_ssl_key: The path to the client SSL key to use for the
            database connection.
        database_ssl_verify_server_cert: Whether to verify the database server
            SSL certificate.
    """

    log_level: str = "ERROR"

    server_id: UUID = Field(default_factory=uuid4)
    username: str
    password: str
    helm_chart: str = get_helm_chart_path()
    zenmlserver_image_tag: str = "latest"
    zenmlinit_image_tag: str = "latest"
    namespace: str = "zenmlserver"
    kubectl_config_path: str = os.path.join(str(Path.home()), ".kube", "config")
    ingress_tls: bool = True
    ingress_tls_generate_certs: bool = True
    ingress_tls_secret_name: str = "zenml-tls-certs"
    ingress_path: str = ""
    create_ingress_controller: bool = True
    ingress_controller_hostname: str = ""
    deploy_db: bool = True
    database_username: str = "user"
    database_password: str = ""
    database_url: str = ""
    database_ssl_ca: str = ""
    database_ssl_cert: str = ""
    database_ssl_key: str = ""
    database_ssl_verify_server_cert: bool = True

    class Config:
        """Pydantic configuration."""

        extra = "allow"
Config

Pydantic configuration.

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
class Config:
    """Pydantic configuration."""

    extra = "allow"
TerraformZenServer (TerraformService) pydantic-model

Service that can be used to start a terraform ZenServer.

Attributes:

Name Type Description
config TerraformZenServerConfig

service configuration

endpoint Optional[zenml.services.service_endpoint.BaseServiceEndpoint]

service endpoint

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
class TerraformZenServer(TerraformService):
    """Service that can be used to start a terraform ZenServer.

    Attributes:
        config: service configuration
        endpoint: service endpoint
    """

    SERVICE_TYPE = ServiceType(
        name="terraform_zenml_server",
        type="zen_server",
        flavor="terraform",
        description="Terraform ZenML server deployment",
    )

    config: TerraformZenServerConfig

    @classmethod
    def get_service(cls) -> Optional["TerraformZenServer"]:
        """Load and return the terraform ZenML server service, if present.

        Returns:
            The terraform ZenML server service or None, if the terraform server
            deployment is not found.
        """
        from zenml.services import ServiceRegistry

        try:
            with open(TERRAFORM_ZENML_SERVER_CONFIG_FILENAME, "r") as f:
                return cast(
                    TerraformZenServer,
                    ServiceRegistry().load_service_from_json(f.read()),
                )
        except FileNotFoundError:
            return None

    def get_vars(self) -> Dict[str, Any]:
        """Get variables as a dictionary.

        Returns:
            A dictionary of variables to use for the Terraform deployment.
        """
        # get the contents of the server deployment config as dict
        filter_vars = ["log_level", "provider"]
        # filter keys that are not modeled as terraform deployment vars
        vars = {
            k: str(v) if isinstance(v, UUID) else v
            for k, v in self.config.server.dict().items()
            if k not in filter_vars
        }
        assert self.status.runtime_path

        with open(
            os.path.join(
                self.status.runtime_path, self.config.variables_file_path
            ),
            "w",
        ) as fp:
            json.dump(vars, fp, indent=4)

        return vars

    def provision(self) -> None:
        """Provision the service."""
        super().provision()
        logger.info(
            f"Your ZenML server is now deployed with URL:\n"
            f"{self.get_server_url()}"
        )

    def get_server_url(self) -> str:
        """Returns the deployed ZenML server's URL.

        Returns:
            The URL of the deployed ZenML server.
        """
        return str(
            self.terraform_client.output(
                TERRAFORM_DEPLOYED_ZENSERVER_OUTPUT_URL, full_value=True
            )
        )

    def get_certificate(self) -> Optional[str]:
        """Returns the CA certificate configured for the ZenML server.

        Returns:
            The CA certificate configured for the ZenML server.
        """
        return cast(
            str,
            self.terraform_client.output(
                TERRAFORM_DEPLOYED_ZENSERVER_OUTPUT_CA_CRT, full_value=True
            ),
        )
get_certificate(self)

Returns the CA certificate configured for the ZenML server.

Returns:

Type Description
Optional[str]

The CA certificate configured for the ZenML server.

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def get_certificate(self) -> Optional[str]:
    """Returns the CA certificate configured for the ZenML server.

    Returns:
        The CA certificate configured for the ZenML server.
    """
    return cast(
        str,
        self.terraform_client.output(
            TERRAFORM_DEPLOYED_ZENSERVER_OUTPUT_CA_CRT, full_value=True
        ),
    )
get_server_url(self)

Returns the deployed ZenML server's URL.

Returns:

Type Description
str

The URL of the deployed ZenML server.

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def get_server_url(self) -> str:
    """Returns the deployed ZenML server's URL.

    Returns:
        The URL of the deployed ZenML server.
    """
    return str(
        self.terraform_client.output(
            TERRAFORM_DEPLOYED_ZENSERVER_OUTPUT_URL, full_value=True
        )
    )
get_service() classmethod

Load and return the terraform ZenML server service, if present.

Returns:

Type Description
Optional[TerraformZenServer]

The terraform ZenML server service or None, if the terraform server deployment is not found.

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
@classmethod
def get_service(cls) -> Optional["TerraformZenServer"]:
    """Load and return the terraform ZenML server service, if present.

    Returns:
        The terraform ZenML server service or None, if the terraform server
        deployment is not found.
    """
    from zenml.services import ServiceRegistry

    try:
        with open(TERRAFORM_ZENML_SERVER_CONFIG_FILENAME, "r") as f:
            return cast(
                TerraformZenServer,
                ServiceRegistry().load_service_from_json(f.read()),
            )
    except FileNotFoundError:
        return None
get_vars(self)

Get variables as a dictionary.

Returns:

Type Description
Dict[str, Any]

A dictionary of variables to use for the Terraform deployment.

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def get_vars(self) -> Dict[str, Any]:
    """Get variables as a dictionary.

    Returns:
        A dictionary of variables to use for the Terraform deployment.
    """
    # get the contents of the server deployment config as dict
    filter_vars = ["log_level", "provider"]
    # filter keys that are not modeled as terraform deployment vars
    vars = {
        k: str(v) if isinstance(v, UUID) else v
        for k, v in self.config.server.dict().items()
        if k not in filter_vars
    }
    assert self.status.runtime_path

    with open(
        os.path.join(
            self.status.runtime_path, self.config.variables_file_path
        ),
        "w",
    ) as fp:
        json.dump(vars, fp, indent=4)

    return vars
provision(self)

Provision the service.

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def provision(self) -> None:
    """Provision the service."""
    super().provision()
    logger.info(
        f"Your ZenML server is now deployed with URL:\n"
        f"{self.get_server_url()}"
    )
TerraformZenServerConfig (TerraformServiceConfig) pydantic-model

Terraform Zen server configuration.

Attributes:

Name Type Description
server TerraformServerDeploymentConfig

The deployment configuration.

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
class TerraformZenServerConfig(TerraformServiceConfig):
    """Terraform Zen server configuration.

    Attributes:
        server: The deployment configuration.
    """

    server: TerraformServerDeploymentConfig
    copy_terraform_files: bool = True
get_helm_chart_path()

Get the ZenML server helm chart path.

The ZenML server helm chart files are located in a folder relative to the zenml.zen_server.deploy Python module.

Returns:

Type Description
str

The helm chart path.

Source code in zenml/zen_server/deploy/terraform/terraform_zen_server.py
def get_helm_chart_path() -> str:
    """Get the ZenML server helm chart path.

    The ZenML server helm chart files are located in a folder relative to the
    `zenml.zen_server.deploy` Python module.

    Returns:
        The helm chart path.
    """
    import zenml.zen_server.deploy as deploy_module

    path = os.path.join(
        os.path.dirname(deploy_module.__file__),
        ZENML_HELM_CHART_SUBPATH,
    )
    return path

routers special

Endpoint definitions.

artifacts_endpoints

Endpoint definitions for steps (and artifacts) of pipeline runs.

create_artifact(artifact, _=Security(oauth2_password_bearer_authentication))

Create a new artifact.

Parameters:

Name Type Description Default
artifact ArtifactRequestModel

The artifact to create.

required

Returns:

Type Description
ArtifactResponseModel

The created artifact.

Source code in zenml/zen_server/routers/artifacts_endpoints.py
@router.post(
    "",
    response_model=ArtifactResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_artifact(
    artifact: ArtifactRequestModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> ArtifactResponseModel:
    """Create a new artifact.

    Args:
        artifact: The artifact to create.

    Returns:
        The created artifact.
    """
    return zen_store().create_artifact(artifact)
list_artifacts(artifact_uri=None, parent_step_id=None, _=Security(oauth2_password_bearer_authentication))

Get artifacts according to query filters.

Parameters:

Name Type Description Default
artifact_uri Optional[str]

If specified, only artifacts with the given URI will be returned.

None
parent_step_id Optional[uuid.UUID]

If specified, only artifacts for the given step run will be returned.

None

Returns:

Type Description
List[zenml.models.artifact_models.ArtifactResponseModel]

The artifacts according to query filters.

Source code in zenml/zen_server/routers/artifacts_endpoints.py
@router.get(
    "",
    response_model=List[ArtifactResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_artifacts(
    artifact_uri: Optional[str] = None,
    parent_step_id: Optional[UUID] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[ArtifactResponseModel]:
    """Get artifacts according to query filters.

    Args:
        artifact_uri: If specified, only artifacts with the given URI will
            be returned.
        parent_step_id: If specified, only artifacts for the given step run
            will be returned.

    Returns:
        The artifacts according to query filters.
    """
    return zen_store().list_artifacts(
        artifact_uri=artifact_uri,
        parent_step_id=parent_step_id,
    )

auth_endpoints

Endpoint definitions for authentication (login).

PasswordRequestForm

OAuth2 password grant type request form.

This form is similar to fastapi.security.OAuth2PasswordRequestForm, with the single difference being that it also allows an empty password.

Source code in zenml/zen_server/routers/auth_endpoints.py
class PasswordRequestForm:
    """OAuth2 password grant type request form.

    This form is similar to `fastapi.security.OAuth2PasswordRequestForm`, with
    the single difference being that it also allows an empty password.
    """

    def __init__(
        self,
        grant_type: str = Form(None, regex="password"),
        username: str = Form(...),
        password: Optional[str] = Form(""),
        scope: str = Form(""),
        client_id: Optional[str] = Form(None),
        client_secret: Optional[str] = Form(None),
    ):
        """Initializes the form.

        Args:
            grant_type: The grant type.
            username: The username.
            password: The password.
            scope: The scope.
            client_id: The client ID.
            client_secret: The client secret.
        """
        self.grant_type = grant_type
        self.username = username
        self.password = password
        self.scope = scope
        self.client_id = client_id
        self.client_secret = client_secret
        self.grant_type = grant_type
        self.username = username
        self.password = password
        self.scopes = scope.split()
        self.client_id = client_id
        self.client_secret = client_secret
__init__(self, grant_type=Form(None), username=Form(Ellipsis), password=Form(), scope=Form(), client_id=Form(None), client_secret=Form(None)) special

Initializes the form.

Parameters:

Name Type Description Default
grant_type str

The grant type.

Form(None)
username str

The username.

Form(Ellipsis)
password Optional[str]

The password.

Form()
scope str

The scope.

Form()
client_id Optional[str]

The client ID.

Form(None)
client_secret Optional[str]

The client secret.

Form(None)
Source code in zenml/zen_server/routers/auth_endpoints.py
def __init__(
    self,
    grant_type: str = Form(None, regex="password"),
    username: str = Form(...),
    password: Optional[str] = Form(""),
    scope: str = Form(""),
    client_id: Optional[str] = Form(None),
    client_secret: Optional[str] = Form(None),
):
    """Initializes the form.

    Args:
        grant_type: The grant type.
        username: The username.
        password: The password.
        scope: The scope.
        client_id: The client ID.
        client_secret: The client secret.
    """
    self.grant_type = grant_type
    self.username = username
    self.password = password
    self.scope = scope
    self.client_id = client_id
    self.client_secret = client_secret
    self.grant_type = grant_type
    self.username = username
    self.password = password
    self.scopes = scope.split()
    self.client_id = client_id
    self.client_secret = client_secret
token(auth_form_data=Depends(NoneType))

Returns an access token for the given user.

Parameters:

Name Type Description Default
auth_form_data PasswordRequestForm

The authentication form data.

Depends(NoneType)

Returns:

Type Description
Dict[str, str]

An access token.

Exceptions:

Type Description
HTTPException

401 if not authorized to login.

Source code in zenml/zen_server/routers/auth_endpoints.py
@router.post(
    LOGIN,
    responses={401: error_response},
)
def token(
    auth_form_data: PasswordRequestForm = Depends(),
) -> Dict[str, str]:
    """Returns an access token for the given user.

    Args:
        auth_form_data: The authentication form data.

    Returns:
        An access token.

    Raises:
        HTTPException: 401 if not authorized to login.
    """
    auth_context = authenticate_credentials(
        user_name_or_id=auth_form_data.username,
        password=auth_form_data.password,
    )
    if not auth_context:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    role_assignments = zen_store().list_role_assignments(
        user_name_or_id=auth_context.user.id, project_name_or_id=None
    )

    permissions = set().union(
        *[
            zen_store().get_role(ra.role.id).permissions
            for ra in role_assignments
        ]
    )

    access_token = auth_context.user.generate_access_token(
        permissions=[p.value for p in permissions]
    )
    # The response of the token endpoint must be a JSON object with the
    # following fields:
    #
    #   * token_type - the token type (must be "bearer" in our case)
    #   * access_token - string containing the access token
    return {"access_token": access_token, "token_type": "bearer"}

flavors_endpoints

Endpoint definitions for flavors.

delete_flavor(flavor_id, _=Security(oauth2_password_bearer_authentication))

Deletes a flavor.

Parameters:

Name Type Description Default
flavor_id UUID

ID of the flavor.

required
Source code in zenml/zen_server/routers/flavors_endpoints.py
@router.delete(
    "/{flavor_id}",
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_flavor(
    flavor_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
    """Deletes a flavor.

    Args:
        flavor_id: ID of the flavor.
    """
    zen_store().delete_flavor(flavor_id)
get_flavor(flavor_id, _=Security(oauth2_password_bearer_authentication))

Returns the requested flavor.

Parameters:

Name Type Description Default
flavor_id UUID

ID of the flavor.

required

Returns:

Type Description
FlavorResponseModel

The requested stack.

Source code in zenml/zen_server/routers/flavors_endpoints.py
@router.get(
    "/{flavor_id}",
    response_model=FlavorResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_flavor(
    flavor_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> FlavorResponseModel:
    """Returns the requested flavor.

    Args:
        flavor_id: ID of the flavor.

    Returns:
        The requested stack.
    """
    flavor = zen_store().get_flavor(flavor_id)
    return flavor
list_flavors(project_name_or_id=None, component_type=None, user_name_or_id=None, name=None, is_shared=None, _=Security(oauth2_password_bearer_authentication))

Returns all flavors.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

None
component_type Optional[zenml.enums.StackComponentType]

Optionally filter by component type.

None
user_name_or_id Union[str, uuid.UUID]

Optionally filter by name or ID of the user.

None
name Optional[str]

Optionally filter by flavor name.

None
is_shared Optional[bool]

Optionally filter by shared status of the flavor.

None

Returns:

Type Description
List[zenml.models.flavor_models.FlavorResponseModel]

All flavors.

Source code in zenml/zen_server/routers/flavors_endpoints.py
@router.get(
    "",
    response_model=List[FlavorResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_flavors(
    project_name_or_id: Optional[Union[str, UUID]] = None,
    component_type: Optional[StackComponentType] = None,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    name: Optional[str] = None,
    is_shared: Optional[bool] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[FlavorResponseModel]:
    """Returns all flavors.

    Args:
        project_name_or_id: Name or ID of the project.
        component_type: Optionally filter by component type.
        user_name_or_id: Optionally filter by name or ID of the user.
        name: Optionally filter by flavor name.
        is_shared: Optionally filter by shared status of the flavor.

    Returns:
        All flavors.
    """
    return zen_store().list_flavors(
        project_name_or_id=project_name_or_id,
        component_type=component_type,
        user_name_or_id=user_name_or_id,
        is_shared=is_shared,
        name=name,
    )

metadata_config_endpoints

Endpoint definitions for metadata config.

get_metadata_config(_=Security(oauth2_password_bearer_authentication))

Gets the metadata config.

Returns:

Type Description
str

The metadata config.

Source code in zenml/zen_server/routers/metadata_config_endpoints.py
@router.get(
    "",
    response_model=str,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_metadata_config(
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE])
) -> str:
    """Gets the metadata config.

    Returns:
        The metadata config.
    """
    from google.protobuf.json_format import MessageToJson

    config = zen_store().get_metadata_config(expand_certs=True)
    return MessageToJson(config)

metadata_sync_endpoints

Endpoint definitions for metadata config.

sync_runs(_=Security(oauth2_password_bearer_authentication))

Sync pipeline runs.

Source code in zenml/zen_server/routers/metadata_sync_endpoints.py
@router.get(
    "",
    response_model=None,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def sync_runs(
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> None:
    """Sync pipeline runs."""
    try:
        zen_store()._sync_runs()
    except Exception:
        logger.exception("Failed to sync pipeline runs.")

pipelines_endpoints

Endpoint definitions for pipelines.

delete_pipeline(pipeline_id, _=Security(oauth2_password_bearer_authentication))

Deletes a specific pipeline.

Parameters:

Name Type Description Default
pipeline_id UUID

ID of the pipeline to get.

required
Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.delete(
    "/{pipeline_id}",
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_pipeline(
    pipeline_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
    """Deletes a specific pipeline.

    Args:
        pipeline_id: ID of the pipeline to get.
    """
    zen_store().delete_pipeline(pipeline_id=pipeline_id)
get_pipeline(pipeline_id, _=Security(oauth2_password_bearer_authentication))

Gets a specific pipeline using its unique id.

Parameters:

Name Type Description Default
pipeline_id UUID

ID of the pipeline to get.

required

Returns:

Type Description
PipelineResponseModel

A specific pipeline object.

Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.get(
    "/{pipeline_id}",
    response_model=PipelineResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_pipeline(
    pipeline_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> PipelineResponseModel:
    """Gets a specific pipeline using its unique id.

    Args:
        pipeline_id: ID of the pipeline to get.

    Returns:
        A specific pipeline object.
    """
    return zen_store().get_pipeline(pipeline_id=pipeline_id)
get_pipeline_spec(pipeline_id, _=Security(oauth2_password_bearer_authentication))

Gets the spec of a specific pipeline using its unique id.

Parameters:

Name Type Description Default
pipeline_id UUID

ID of the pipeline to get.

required

Returns:

Type Description
PipelineSpec

The spec of the pipeline.

Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.get(
    "/{pipeline_id}" + PIPELINE_SPEC,
    response_model=PipelineSpec,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_pipeline_spec(
    pipeline_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> PipelineSpec:
    """Gets the spec of a specific pipeline using its unique id.

    Args:
        pipeline_id: ID of the pipeline to get.

    Returns:
        The spec of the pipeline.
    """
    return zen_store().get_pipeline(pipeline_id).spec
list_pipeline_runs(pipeline_id, project_name_or_id=None, stack_id=None, run_name=None, user_name_or_id=None, component_id=None, _=Security(oauth2_password_bearer_authentication))

Get pipeline runs according to query filters.

Parameters:

Name Type Description Default
pipeline_id UUID

ID of the pipeline for which to list runs.

required
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project for which to filter runs.

None
stack_id Optional[uuid.UUID]

ID of the stack for which to filter runs.

None
run_name Optional[str]

Filter by run name if provided

None
user_name_or_id Union[str, uuid.UUID]

If provided, only return runs for this user.

None
component_id Optional[uuid.UUID]

Filter by ID of a component that was used in the run.

None

Returns:

Type Description
List[zenml.models.pipeline_run_models.PipelineRunResponseModel]

The pipeline runs according to query filters.

Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.get(
    "/{pipeline_id}" + RUNS,
    response_model=PipelineRunResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_pipeline_runs(
    pipeline_id: UUID,
    project_name_or_id: Optional[Union[str, UUID]] = None,
    stack_id: Optional[UUID] = None,
    run_name: Optional[str] = None,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    component_id: Optional[UUID] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[PipelineRunResponseModel]:
    """Get pipeline runs according to query filters.

    Args:
        pipeline_id: ID of the pipeline for which to list runs.
        project_name_or_id: Name or ID of the project for which to filter runs.
        stack_id: ID of the stack for which to filter runs.
        run_name: Filter by run name if provided
        user_name_or_id: If provided, only return runs for this user.
        component_id: Filter by ID of a component that was used in the run.

    Returns:
        The pipeline runs according to query filters.
    """
    return zen_store().list_runs(
        project_name_or_id=project_name_or_id,
        name=run_name,
        stack_id=stack_id,
        component_id=component_id,
        user_name_or_id=user_name_or_id,
        pipeline_id=pipeline_id,
    )
list_pipelines(project_name_or_id=None, user_name_or_id=None, name=None, _=Security(oauth2_password_bearer_authentication))

Gets a list of pipelines.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project to get pipelines for.

None
user_name_or_id Union[str, uuid.UUID]

Optionally filter by name or ID of the user.

None
name Optional[str]

Optionally filter by pipeline name

None

Returns:

Type Description
List[zenml.models.pipeline_models.PipelineResponseModel]

List of pipeline objects.

Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.get(
    "",
    response_model=List[PipelineResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_pipelines(
    project_name_or_id: Optional[Union[str, UUID]] = None,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    name: Optional[str] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[PipelineResponseModel]:
    """Gets a list of pipelines.

    Args:
        project_name_or_id: Name or ID of the project to get pipelines for.
        user_name_or_id: Optionally filter by name or ID of the user.
        name: Optionally filter by pipeline name

    Returns:
        List of pipeline objects.
    """
    return zen_store().list_pipelines(
        project_name_or_id=project_name_or_id,
        user_name_or_id=user_name_or_id,
        name=name,
    )
update_pipeline(pipeline_id, pipeline_update, _=Security(oauth2_password_bearer_authentication))

Updates the attribute on a specific pipeline using its unique id.

Parameters:

Name Type Description Default
pipeline_id UUID

ID of the pipeline to get.

required
pipeline_update PipelineUpdateModel

the model containing the attributes to update.

required

Returns:

Type Description
PipelineResponseModel

The updated pipeline object.

Source code in zenml/zen_server/routers/pipelines_endpoints.py
@router.put(
    "/{pipeline_id}",
    response_model=PipelineResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_pipeline(
    pipeline_id: UUID,
    pipeline_update: PipelineUpdateModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> PipelineResponseModel:
    """Updates the attribute on a specific pipeline using its unique id.

    Args:
        pipeline_id: ID of the pipeline to get.
        pipeline_update: the model containing the attributes to update.

    Returns:
        The updated pipeline object.
    """
    return zen_store().update_pipeline(
        pipeline_id=pipeline_id, pipeline_update=pipeline_update
    )

projects_endpoints

Endpoint definitions for projects.

create_flavor(project_name_or_id, flavor, auth_context=Security(oauth2_password_bearer_authentication))

Creates a stack component flavor.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required
flavor FlavorRequestModel

Stack component flavor to register.

required
auth_context AuthContext

Authentication context.

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
FlavorResponseModel

The created stack component flavor.

Exceptions:

Type Description
IllegalOperationError

If the project or user specified in the stack component flavor does not match the current project or authenticated user.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
    "/{project_name_or_id}" + FLAVORS,
    response_model=FlavorResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_flavor(
    project_name_or_id: Union[str, UUID],
    flavor: FlavorRequestModel,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.WRITE]
    ),
) -> FlavorResponseModel:
    """Creates a stack component flavor.

    Args:
        project_name_or_id: Name or ID of the project.
        flavor: Stack component flavor to register.
        auth_context: Authentication context.

    Returns:
        The created stack component flavor.

    Raises:
        IllegalOperationError: If the project or user specified in the stack
            component flavor does not match the current project or authenticated
            user.
    """
    project = zen_store().get_project(project_name_or_id)

    if flavor.project != project.id:
        raise IllegalOperationError(
            "Creating flavors outside of the project scope "
            f"of this endpoint `{project_name_or_id}` is "
            f"not supported."
        )
    if flavor.user != auth_context.user.id:
        raise IllegalOperationError(
            "Creating flavors for a user other than yourself "
            "is not supported."
        )

    created_flavor = zen_store().create_flavor(
        flavor=flavor,
    )
    return created_flavor
create_pipeline(project_name_or_id, pipeline, auth_context=Security(oauth2_password_bearer_authentication))

Creates a pipeline.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required
pipeline PipelineRequestModel

Pipeline to create.

required
auth_context AuthContext

Authentication context.

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
PipelineResponseModel

The created pipeline.

Exceptions:

Type Description
IllegalOperationError

If the project or user specified in the pipeline does not match the current project or authenticated user.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
    "/{project_name_or_id}" + PIPELINES,
    response_model=PipelineResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_pipeline(
    project_name_or_id: Union[str, UUID],
    pipeline: PipelineRequestModel,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.WRITE]
    ),
) -> PipelineResponseModel:
    """Creates a pipeline.

    Args:
        project_name_or_id: Name or ID of the project.
        pipeline: Pipeline to create.
        auth_context: Authentication context.

    Returns:
        The created pipeline.

    Raises:
        IllegalOperationError: If the project or user specified in the pipeline
            does not match the current project or authenticated user.
    """
    project = zen_store().get_project(project_name_or_id)

    if pipeline.project != project.id:
        raise IllegalOperationError(
            "Creating pipelines outside of the project scope "
            f"of this endpoint `{project_name_or_id}` is "
            f"not supported."
        )
    if pipeline.user != auth_context.user.id:
        raise IllegalOperationError(
            "Creating pipelines for a user other than yourself "
            "is not supported."
        )

    return zen_store().create_pipeline(pipeline=pipeline)
create_pipeline_run(project_name_or_id, pipeline_run, auth_context=Security(oauth2_password_bearer_authentication), get_if_exists=False)

Creates a pipeline run.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required
pipeline_run PipelineRunRequestModel

Pipeline run to create.

required
auth_context AuthContext

Authentication context.

Security(oauth2_password_bearer_authentication)
get_if_exists bool

If a similar pipeline run already exists, return it instead of raising an error.

False

Returns:

Type Description
PipelineRunResponseModel

The created pipeline run.

Exceptions:

Type Description
IllegalOperationError

If the project or user specified in the pipeline run does not match the current project or authenticated user.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
    "/{project_name_or_id}" + RUNS,
    response_model=PipelineRunResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_pipeline_run(
    project_name_or_id: Union[str, UUID],
    pipeline_run: PipelineRunRequestModel,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.WRITE]
    ),
    get_if_exists: bool = False,
) -> PipelineRunResponseModel:
    """Creates a pipeline run.

    Args:
        project_name_or_id: Name or ID of the project.
        pipeline_run: Pipeline run to create.
        auth_context: Authentication context.
        get_if_exists: If a similar pipeline run already exists, return it
            instead of raising an error.

    Returns:
        The created pipeline run.

    Raises:
        IllegalOperationError: If the project or user specified in the pipeline
            run does not match the current project or authenticated user.
    """
    project = zen_store().get_project(project_name_or_id)

    if pipeline_run.project != project.id:
        raise IllegalOperationError(
            "Creating pipeline runs outside of the project scope "
            f"of this endpoint `{project_name_or_id}` is "
            f"not supported."
        )
    if pipeline_run.user != auth_context.user.id:
        raise IllegalOperationError(
            "Creating pipeline runs for a user other than yourself "
            "is not supported."
        )

    if get_if_exists:
        return zen_store().get_or_create_run(pipeline_run=pipeline_run)
    return zen_store().create_run(pipeline_run=pipeline_run)
create_project(project, _=Security(oauth2_password_bearer_authentication))

Creates a project based on the requestBody.

noqa: DAR401

Parameters:

Name Type Description Default
project ProjectRequestModel

Project to create.

required

Returns:

Type Description
ProjectResponseModel

The created project.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
    "",
    response_model=ProjectResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_project(
    project: ProjectRequestModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> ProjectResponseModel:
    """Creates a project based on the requestBody.

    # noqa: DAR401

    Args:
        project: Project to create.

    Returns:
        The created project.
    """
    return zen_store().create_project(project=project)
create_stack(project_name_or_id, stack, auth_context=Security(oauth2_password_bearer_authentication))

Creates a stack for a particular project.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required
stack StackRequestModel

Stack to register.

required
auth_context AuthContext

The authentication context.

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
StackResponseModel

The created stack.

Exceptions:

Type Description
IllegalOperationError

If the project or user specified in the stack does not match the current project or authenticated user.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
    "/{project_name_or_id}" + STACKS,
    response_model=StackResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_stack(
    project_name_or_id: Union[str, UUID],
    stack: StackRequestModel,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.WRITE]
    ),
) -> StackResponseModel:
    """Creates a stack for a particular project.

    Args:
        project_name_or_id: Name or ID of the project.
        stack: Stack to register.
        auth_context: The authentication context.

    Returns:
        The created stack.

    Raises:
        IllegalOperationError: If the project or user specified in the stack
            does not match the current project or authenticated user.
    """
    project = zen_store().get_project(project_name_or_id)

    if stack.project != project.id:
        raise IllegalOperationError(
            "Creating stacks outside of the project scope "
            f"of this endpoint `{project_name_or_id}` is "
            f"not supported."
        )
    if stack.user != auth_context.user.id:
        raise IllegalOperationError(
            "Creating stacks for a user other than yourself "
            "is not supported."
        )

    return zen_store().create_stack(stack=stack)
create_stack_component(project_name_or_id, component, auth_context=Security(oauth2_password_bearer_authentication))

Creates a stack component.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required
component ComponentRequestModel

Stack component to register.

required
auth_context AuthContext

Authentication context.

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
ComponentResponseModel

The created stack component.

Exceptions:

Type Description
IllegalOperationError

If the project or user specified in the stack component does not match the current project or authenticated user.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.post(
    "/{project_name_or_id}" + STACK_COMPONENTS,
    response_model=ComponentResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_stack_component(
    project_name_or_id: Union[str, UUID],
    component: ComponentRequestModel,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.WRITE]
    ),
) -> ComponentResponseModel:
    """Creates a stack component.

    Args:
        project_name_or_id: Name or ID of the project.
        component: Stack component to register.
        auth_context: Authentication context.

    Returns:
        The created stack component.

    Raises:
        IllegalOperationError: If the project or user specified in the stack
            component does not match the current project or authenticated user.
    """
    project = zen_store().get_project(project_name_or_id)

    if component.project != project.id:
        raise IllegalOperationError(
            "Creating components outside of the project scope "
            f"of this endpoint `{project_name_or_id}` is "
            f"not supported."
        )
    if component.user != auth_context.user.id:
        raise IllegalOperationError(
            "Creating components for a user other than yourself "
            "is not supported."
        )

    # TODO: [server] if possible it should validate here that the configuration
    #  conforms to the flavor

    return zen_store().create_stack_component(component=component)
delete_project(project_name_or_id, _=Security(oauth2_password_bearer_authentication))

Deletes a project.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required
Source code in zenml/zen_server/routers/projects_endpoints.py
@router.delete(
    "/{project_name_or_id}",
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_project(
    project_name_or_id: Union[str, UUID],
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
    """Deletes a project.

    Args:
        project_name_or_id: Name or ID of the project.
    """
    zen_store().delete_project(project_name_or_id=project_name_or_id)
get_project(project_name_or_id, _=Security(oauth2_password_bearer_authentication))

Get a project for given name.

noqa: DAR401

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required

Returns:

Type Description
ProjectResponseModel

The requested project.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
    "/{project_name_or_id}",
    response_model=ProjectResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_project(
    project_name_or_id: Union[str, UUID],
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> ProjectResponseModel:
    """Get a project for given name.

    # noqa: DAR401

    Args:
        project_name_or_id: Name or ID of the project.

    Returns:
        The requested project.
    """
    return zen_store().get_project(project_name_or_id=project_name_or_id)
get_project_statistics(project_name_or_id, _=Security(oauth2_password_bearer_authentication))

Gets statistics of a project.

noqa: DAR401

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project to get statistics for.

required

Returns:

Type Description
Dict[str, int]

All pipelines within the project.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
    "/{project_name_or_id}" + STATISTICS,
    response_model=Dict[str, str],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_project_statistics(
    project_name_or_id: Union[str, UUID],
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, int]:
    """Gets statistics of a project.

    # noqa: DAR401

    Args:
        project_name_or_id: Name or ID of the project to get statistics for.

    Returns:
        All pipelines within the project.
    """
    zen_store().list_runs()
    return {
        "stacks": len(
            zen_store().list_stacks(project_name_or_id=project_name_or_id)
        ),
        "components": len(
            zen_store().list_stack_components(
                project_name_or_id=project_name_or_id
            )
        ),
        "pipelines": len(
            zen_store().list_pipelines(project_name_or_id=project_name_or_id)
        ),
        "runs": len(
            zen_store().list_runs(project_name_or_id=project_name_or_id)
        ),
    }
get_role_assignments_for_project(project_name_or_id, user_name_or_id=None, team_name_or_id=None, _=Security(oauth2_password_bearer_authentication))

Returns a list of all roles that are assigned to a team.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required
user_name_or_id Union[str, uuid.UUID]

If provided, only list roles that are assigned to the given user.

None
team_name_or_id Union[str, uuid.UUID]

If provided, only list roles that are assigned to the given team.

None

Returns:

Type Description
List[zenml.models.role_assignment_models.RoleAssignmentResponseModel]

A list of all roles that are assigned to a team.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
    "/{project_name_or_id}" + ROLES,
    response_model=List[RoleAssignmentResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_role_assignments_for_project(
    project_name_or_id: Union[str, UUID],
    user_name_or_id: Optional[Union[str, UUID]] = None,
    team_name_or_id: Optional[Union[str, UUID]] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[RoleAssignmentResponseModel]:
    """Returns a list of all roles that are assigned to a team.

    Args:
        project_name_or_id: Name or ID of the project.
        user_name_or_id: If provided, only list roles that are assigned to the
            given user.
        team_name_or_id: If provided, only list roles that are assigned to the
            given team.

    Returns:
        A list of all roles that are assigned to a team.
    """
    return zen_store().list_role_assignments(
        project_name_or_id=project_name_or_id,
        user_name_or_id=user_name_or_id,
        team_name_or_id=team_name_or_id,
    )
list_project_flavors(project_name_or_id=None, component_type=None, user_name_or_id=None, name=None, is_shared=None, _=Security(oauth2_password_bearer_authentication))

List stack components flavors of a certain type that are part of a project.

noqa: DAR401

Parameters:

Name Type Description Default
component_type Optional[zenml.enums.StackComponentType]

Type of the component.

None
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

None
user_name_or_id Union[str, uuid.UUID]

Optionally filter by name or ID of the user.

None
name Optional[str]

Optionally filter by flavor name.

None
is_shared Optional[bool]

Optionally filter by shared status of the flavor.

None

Returns:

Type Description
List[zenml.models.flavor_models.FlavorResponseModel]

All stack components of a certain type that are part of a project.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
    "/{project_name_or_id}" + FLAVORS,
    response_model=List[FlavorResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_project_flavors(
    project_name_or_id: Optional[Union[str, UUID]] = None,
    component_type: Optional[StackComponentType] = None,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    name: Optional[str] = None,
    is_shared: Optional[bool] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[FlavorResponseModel]:
    """List stack components flavors of a certain type that are part of a project.

    # noqa: DAR401

    Args:
        component_type: Type of the component.
        project_name_or_id: Name or ID of the project.
        user_name_or_id: Optionally filter by name or ID of the user.
        name: Optionally filter by flavor name.
        is_shared: Optionally filter by shared status of the flavor.

    Returns:
        All stack components of a certain type that are part of a project.
    """
    return zen_store().list_flavors(
        project_name_or_id=project_name_or_id,
        component_type=component_type,
        user_name_or_id=user_name_or_id,
        is_shared=is_shared,
        name=name,
    )
list_project_pipelines(project_name_or_id, user_name_or_id=None, name=None, _=Security(oauth2_password_bearer_authentication))

Gets pipelines defined for a specific project.

noqa: DAR401

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project to get pipelines for.

required
user_name_or_id Union[str, uuid.UUID]

Optionally filter by name or ID of the user.

None
name Optional[str]

Optionally filter by pipeline name

None

Returns:

Type Description
List[zenml.models.pipeline_models.PipelineResponseModel]

All pipelines within the project.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
    "/{project_name_or_id}" + PIPELINES,
    response_model=List[PipelineResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_project_pipelines(
    project_name_or_id: Union[str, UUID],
    user_name_or_id: Optional[Union[str, UUID]] = None,
    name: Optional[str] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[PipelineResponseModel]:
    """Gets pipelines defined for a specific project.

    # noqa: DAR401

    Args:
        project_name_or_id: Name or ID of the project to get pipelines for.
        user_name_or_id: Optionally filter by name or ID of the user.
        name: Optionally filter by pipeline name

    Returns:
        All pipelines within the project.
    """
    return zen_store().list_pipelines(
        project_name_or_id=project_name_or_id,
        user_name_or_id=user_name_or_id,
        name=name,
    )
list_project_stack_components(project_name_or_id, user_name_or_id=None, type=None, name=None, flavor_name=None, is_shared=None, auth_context=Security(oauth2_password_bearer_authentication))

List stack components that are part of a specific project.

noqa: DAR401

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required
user_name_or_id Union[str, uuid.UUID]

Optionally filter by name or ID of the user.

None
name Optional[str]

Optionally filter by component name

None
type Optional[str]

Optionally filter by component type

None
flavor_name Optional[str]

Optionally filter by flavor name

None
is_shared Optional[bool]

Optionally filter by shared status of the component

None
auth_context AuthContext

Authentication Context

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
List[zenml.models.component_models.ComponentResponseModel]

All stack components part of the specified project.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
    "/{project_name_or_id}" + STACK_COMPONENTS,
    response_model=List[ComponentResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_project_stack_components(
    project_name_or_id: Union[str, UUID],
    user_name_or_id: Optional[Union[str, UUID]] = None,
    type: Optional[str] = None,
    name: Optional[str] = None,
    flavor_name: Optional[str] = None,
    is_shared: Optional[bool] = None,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.READ]
    ),
) -> List[ComponentResponseModel]:
    """List stack components that are part of a specific project.

    # noqa: DAR401

    Args:
        project_name_or_id: Name or ID of the project.
        user_name_or_id: Optionally filter by name or ID of the user.
        name: Optionally filter by component name
        type: Optionally filter by component type
        flavor_name: Optionally filter by flavor name
        is_shared: Optionally filter by shared status of the component
        auth_context: Authentication Context

    Returns:
        All stack components part of the specified project.
    """
    components = zen_store().list_stack_components(
        name=name,
        user_name_or_id=user_name_or_id or auth_context.user.id,
        project_name_or_id=project_name_or_id,
        flavor_name=flavor_name,
        type=type,
        is_shared=False,
    )
    # In case the user didn't explicitly filter for is shared == False
    if is_shared is None or is_shared:
        shared_components = zen_store().list_stack_components(
            project_name_or_id=project_name_or_id,
            user_name_or_id=user_name_or_id,
            flavor_name=flavor_name,
            name=name,
            type=type,
            is_shared=True,
        )

        components += shared_components
    return components
list_project_stacks(project_name_or_id, user_name_or_id=None, component_id=None, name=None, is_shared=None, auth_context=Security(oauth2_password_bearer_authentication))

Get stacks that are part of a specific project.

noqa: DAR401

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project.

required
user_name_or_id Union[str, uuid.UUID]

Optionally filter by name or ID of the user.

None
component_id Optional[uuid.UUID]

Optionally filter by component that is part of the stack.

None
name Optional[str]

Optionally filter by stack name

None
is_shared Optional[bool]

Optionally filter by shared status of the stack

None
auth_context AuthContext

Authentication Context

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
List[zenml.models.stack_models.StackResponseModel]

All stacks part of the specified project.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
    "/{project_name_or_id}" + STACKS,
    response_model=List[StackResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_project_stacks(
    project_name_or_id: Union[str, UUID],
    user_name_or_id: Optional[Union[str, UUID]] = None,
    component_id: Optional[UUID] = None,
    name: Optional[str] = None,
    is_shared: Optional[bool] = None,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.READ]
    ),
) -> List[StackResponseModel]:
    """Get stacks that are part of a specific project.

    # noqa: DAR401

    Args:
        project_name_or_id: Name or ID of the project.
        user_name_or_id: Optionally filter by name or ID of the user.
        component_id: Optionally filter by component that is part of the stack.
        name: Optionally filter by stack name
        is_shared: Optionally filter by shared status of the stack
        auth_context: Authentication Context

    Returns:
        All stacks part of the specified project.
    """
    stacks = zen_store().list_stacks(
        project_name_or_id=project_name_or_id,
        user_name_or_id=user_name_or_id or auth_context.user.id,
        component_id=component_id,
        is_shared=False,
        name=name,
    )
    # In case the user didn't explicitly filter for is shared == False
    if is_shared is None or is_shared:
        shared_stacks = zen_store().list_stacks(
            project_name_or_id=project_name_or_id,
            user_name_or_id=user_name_or_id or auth_context.user.id,
            component_id=component_id,
            is_shared=True,
            name=name,
        )
        stacks += shared_stacks

    return stacks
list_projects(_=Security(oauth2_password_bearer_authentication))

Lists all projects in the organization.

Returns:

Type Description
List[zenml.models.project_models.ProjectResponseModel]

A list of projects.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
    "",
    response_model=List[ProjectResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_projects(
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> List[ProjectResponseModel]:
    """Lists all projects in the organization.

    Returns:
        A list of projects.
    """
    return zen_store().list_projects()
list_runs(project_name_or_id, stack_id=None, name=None, user_name_or_id=None, component_id=None, pipeline_id=None, unlisted=False, _=Security(oauth2_password_bearer_authentication))

Get pipeline runs according to query filters.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project for which to filter runs.

required
stack_id Optional[uuid.UUID]

ID of the stack for which to filter runs.

None
name Optional[str]

Filter by run name if provided

None
user_name_or_id Union[str, uuid.UUID]

If provided, only return runs for this user.

None
component_id Optional[uuid.UUID]

Filter by ID of a component that was used in the run.

None
pipeline_id Optional[uuid.UUID]

ID of the pipeline for which to filter runs.

None
unlisted bool

If True, only return unlisted runs that are not associated with any pipeline.

False

Returns:

Type Description
List[zenml.models.pipeline_run_models.PipelineRunResponseModel]

The pipeline runs according to query filters.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.get(
    "/{project_name_or_id}" + RUNS,
    response_model=List[PipelineRunResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_runs(
    project_name_or_id: Union[str, UUID],
    stack_id: Optional[UUID] = None,
    name: Optional[str] = None,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    component_id: Optional[UUID] = None,
    pipeline_id: Optional[UUID] = None,
    unlisted: bool = False,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[PipelineRunResponseModel]:
    """Get pipeline runs according to query filters.

    Args:
        project_name_or_id: Name or ID of the project for which to filter runs.
        stack_id: ID of the stack for which to filter runs.
        name: Filter by run name if provided
        user_name_or_id: If provided, only return runs for this user.
        component_id: Filter by ID of a component that was used in the run.
        pipeline_id: ID of the pipeline for which to filter runs.
        unlisted: If True, only return unlisted runs that are not
            associated with any pipeline.

    Returns:
        The pipeline runs according to query filters.
    """
    return zen_store().list_runs(
        project_name_or_id=project_name_or_id,
        name=name,
        stack_id=stack_id,
        component_id=component_id,
        user_name_or_id=user_name_or_id,
        pipeline_id=pipeline_id,
        unlisted=unlisted,
    )
update_project(project_name_or_id, project_update, _=Security(oauth2_password_bearer_authentication))

Get a project for given name.

noqa: DAR401

Parameters:

Name Type Description Default
project_name_or_id UUID

Name or ID of the project to update.

required
project_update ProjectUpdateModel

the project to use to update

required

Returns:

Type Description
ProjectResponseModel

The updated project.

Source code in zenml/zen_server/routers/projects_endpoints.py
@router.put(
    "/{project_name_or_id}",
    response_model=ProjectResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_project(
    project_name_or_id: UUID,
    project_update: ProjectUpdateModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> ProjectResponseModel:
    """Get a project for given name.

    # noqa: DAR401

    Args:
        project_name_or_id: Name or ID of the project to update.
        project_update: the project to use to update

    Returns:
        The updated project.
    """
    return zen_store().update_project(
        project_id=project_name_or_id,
        project_update=project_update,
    )

role_assignments_endpoints

Endpoint definitions for role assignments.

create_role_assignment(role_assignment, _=Security(oauth2_password_bearer_authentication))

Creates a role assignment.

noqa: DAR401

Parameters:

Name Type Description Default
role_assignment RoleAssignmentRequestModel

Role assignment to create.

required

Returns:

Type Description
RoleAssignmentResponseModel

The created role assignment.

Source code in zenml/zen_server/routers/role_assignments_endpoints.py
@router.post(
    "",
    response_model=RoleAssignmentResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_role_assignment(
    role_assignment: RoleAssignmentRequestModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> RoleAssignmentResponseModel:
    """Creates a role assignment.

    # noqa: DAR401

    Args:
        role_assignment: Role assignment to create.

    Returns:
        The created role assignment.
    """
    return zen_store().create_role_assignment(role_assignment=role_assignment)
delete_role_assignment(role_assignment_id, _=Security(oauth2_password_bearer_authentication))

Deletes a specific role.

Parameters:

Name Type Description Default
role_assignment_id UUID

The ID of the role assignment.

required
Source code in zenml/zen_server/routers/role_assignments_endpoints.py
@router.delete(
    "/{role_assignment_id}",
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_role_assignment(
    role_assignment_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
    """Deletes a specific role.

    Args:
        role_assignment_id: The ID of the role assignment.
    """
    zen_store().delete_role_assignment(role_assignment_id=role_assignment_id)
get_role_assignment(role_assignment_id, _=Security(oauth2_password_bearer_authentication))

Returns a specific role assignment.

Parameters:

Name Type Description Default
role_assignment_id UUID

Name or ID of the role assignment.

required

Returns:

Type Description
RoleAssignmentResponseModel

A specific role assignment.

Source code in zenml/zen_server/routers/role_assignments_endpoints.py
@router.get(
    "/{role_assignment_id}",
    response_model=RoleAssignmentResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_role_assignment(
    role_assignment_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> RoleAssignmentResponseModel:
    """Returns a specific role assignment.

    Args:
        role_assignment_id: Name or ID of the role assignment.

    Returns:
        A specific role assignment.
    """
    return zen_store().get_role_assignment(
        role_assignment_id=role_assignment_id
    )
list_role_assignments(project_name_or_id=None, role_name_or_id=None, team_name_or_id=None, user_name_or_id=None, _=Security(oauth2_password_bearer_authentication))

Returns a list of all role assignments.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

If provided, only list assignments for the given project

None
role_name_or_id Union[str, uuid.UUID]

If provided, only list assignments of the given role

None
team_name_or_id Union[str, uuid.UUID]

If provided, only list assignments for the given team

None
user_name_or_id Union[str, uuid.UUID]

If provided, only list assignments for the given user

None

Returns:

Type Description
List[zenml.models.role_assignment_models.RoleAssignmentResponseModel]

List of all role assignments.

Source code in zenml/zen_server/routers/role_assignments_endpoints.py
@router.get(
    "",
    response_model=List[RoleAssignmentResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_role_assignments(
    project_name_or_id: Optional[Union[str, UUID]] = None,
    role_name_or_id: Optional[Union[str, UUID]] = None,
    team_name_or_id: Optional[Union[str, UUID]] = None,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[RoleAssignmentResponseModel]:
    """Returns a list of all role assignments.

    Args:
        project_name_or_id: If provided, only list assignments for the given
            project
        role_name_or_id: If provided, only list assignments of the given
            role
        team_name_or_id: If provided, only list assignments for the given
            team
        user_name_or_id: If provided, only list assignments for the given
            user

    Returns:
        List of all role assignments.
    """
    return zen_store().list_role_assignments(
        project_name_or_id=project_name_or_id,
        role_name_or_id=role_name_or_id,
        team_name_or_id=team_name_or_id,
        user_name_or_id=user_name_or_id,
    )

roles_endpoints

Endpoint definitions for roles and role assignment.

create_role(role, _=Security(oauth2_password_bearer_authentication))

Creates a role.

noqa: DAR401

Parameters:

Name Type Description Default
role RoleRequestModel

Role to create.

required

Returns:

Type Description
RoleResponseModel

The created role.

Source code in zenml/zen_server/routers/roles_endpoints.py
@router.post(
    "",
    response_model=RoleResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_role(
    role: RoleRequestModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> RoleResponseModel:
    """Creates a role.

    # noqa: DAR401

    Args:
        role: Role to create.

    Returns:
        The created role.
    """
    return zen_store().create_role(role=role)
delete_role(role_name_or_id, _=Security(oauth2_password_bearer_authentication))

Deletes a specific role.

Parameters:

Name Type Description Default
role_name_or_id Union[str, uuid.UUID]

Name or ID of the role.

required
Source code in zenml/zen_server/routers/roles_endpoints.py
@router.delete(
    "/{role_name_or_id}",
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_role(
    role_name_or_id: Union[str, UUID],
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
    """Deletes a specific role.

    Args:
        role_name_or_id: Name or ID of the role.
    """
    zen_store().delete_role(role_name_or_id=role_name_or_id)
get_role(role_name_or_id, _=Security(oauth2_password_bearer_authentication))

Returns a specific role.

Parameters:

Name Type Description Default
role_name_or_id Union[str, uuid.UUID]

Name or ID of the role.

required

Returns:

Type Description
RoleResponseModel

A specific role.

Source code in zenml/zen_server/routers/roles_endpoints.py
@router.get(
    "/{role_name_or_id}",
    response_model=RoleResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_role(
    role_name_or_id: Union[str, UUID],
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> RoleResponseModel:
    """Returns a specific role.

    Args:
        role_name_or_id: Name or ID of the role.

    Returns:
        A specific role.
    """
    return zen_store().get_role(role_name_or_id=role_name_or_id)
list_roles(name=None, _=Security(oauth2_password_bearer_authentication))

Returns a list of all roles.

Parameters:

Name Type Description Default
name Optional[str]

Optional name of the role to filter by.

None

Returns:

Type Description
List[zenml.models.role_models.RoleResponseModel]

List of all roles.

Source code in zenml/zen_server/routers/roles_endpoints.py
@router.get(
    "",
    response_model=List[RoleResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_roles(
    name: Optional[str] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[RoleResponseModel]:
    """Returns a list of all roles.

    Args:
        name: Optional name of the role to filter by.

    Returns:
        List of all roles.
    """
    return zen_store().list_roles(name=name)
update_role(role_id, role_update, _=Security(oauth2_password_bearer_authentication))

Updates a role.

noqa: DAR401

Parameters:

Name Type Description Default
role_id UUID

The ID of the role.

required
role_update RoleUpdateModel

Role update.

required

Returns:

Type Description
RoleResponseModel

The created role.

Source code in zenml/zen_server/routers/roles_endpoints.py
@router.put(
    "/{role_id}",
    response_model=RoleResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def update_role(
    role_id: UUID,
    role_update: RoleUpdateModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> RoleResponseModel:
    """Updates a role.

    # noqa: DAR401

    Args:
        role_id: The ID of the role.
        role_update: Role update.

    Returns:
        The created role.
    """
    return zen_store().update_role(role_id=role_id, role_update=role_update)

runs_endpoints

Endpoint definitions for pipeline runs.

get_pipeline_configuration(run_id, _=Security(oauth2_password_bearer_authentication))

Get the pipeline configuration of a specific pipeline run using its ID.

Parameters:

Name Type Description Default
run_id UUID

ID of the pipeline run to get.

required

Returns:

Type Description
Dict[str, Any]

The pipeline configuration of the pipeline run.

Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
    "/{run_id}" + PIPELINE_CONFIGURATION,
    response_model=Dict[str, Any],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_pipeline_configuration(
    run_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, Any]:
    """Get the pipeline configuration of a specific pipeline run using its ID.

    Args:
        run_id: ID of the pipeline run to get.

    Returns:
        The pipeline configuration of the pipeline run.
    """
    return zen_store().get_run(run_name_or_id=run_id).pipeline_configuration
get_run(run_id, _=Security(oauth2_password_bearer_authentication))

Get a specific pipeline run using its ID.

Parameters:

Name Type Description Default
run_id UUID

ID of the pipeline run to get.

required

Returns:

Type Description
PipelineRunResponseModel

The pipeline run.

Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
    "/{run_id}",
    response_model=PipelineRunResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run(
    run_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> PipelineRunResponseModel:
    """Get a specific pipeline run using its ID.

    Args:
        run_id: ID of the pipeline run to get.

    Returns:
        The pipeline run.
    """
    return zen_store().get_run(run_name_or_id=run_id)
get_run_component_side_effects(run_id, component_id=None, _=Security(oauth2_password_bearer_authentication))

Get the component side effects for a given pipeline run.

Parameters:

Name Type Description Default
run_id UUID

ID of the pipeline run to use to get the component side effects.

required
component_id Optional[uuid.UUID]

ID of the component to use to get the component side effects.

None

Returns:

Type Description
Dict[str, Any]

The component side effects for a given pipeline run.

Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
    "/{run_id}" + COMPONENT_SIDE_EFFECTS,
    response_model=Dict,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run_component_side_effects(
    run_id: UUID,
    component_id: Optional[UUID] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, Any]:
    """Get the component side effects for a given pipeline run.

    Args:
        run_id: ID of the pipeline run to use to get the component side effects.
        component_id: ID of the component to use to get the component
            side effects.

    Returns:
        The component side effects for a given pipeline run.
    """
    return {}
get_run_dag(run_id, _=Security(oauth2_password_bearer_authentication))

Get the DAG for a given pipeline run.

Parameters:

Name Type Description Default
run_id UUID

ID of the pipeline run to use to get the DAG.

required

Returns:

Type Description
LineageGraph

The DAG for a given pipeline run.

Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
    "/{run_id}" + GRAPH,
    response_model=LineageGraph,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run_dag(
    run_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> LineageGraph:
    """Get the DAG for a given pipeline run.

    Args:
        run_id: ID of the pipeline run to use to get the DAG.

    Returns:
        The DAG for a given pipeline run.
    """
    from zenml.post_execution.pipeline_run import PipelineRunView

    run = zen_store().get_run(run_name_or_id=run_id)
    graph = LineageGraph()
    graph.generate_run_nodes_and_edges(PipelineRunView(run))
    return graph
get_run_status(run_id, _=Security(oauth2_password_bearer_authentication))

Get the status of a specific pipeline run.

Parameters:

Name Type Description Default
run_id UUID

ID of the pipeline run for which to get the status.

required

Returns:

Type Description
ExecutionStatus

The status of the pipeline run.

Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
    "/{run_id}" + STATUS,
    response_model=ExecutionStatus,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run_status(
    run_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> ExecutionStatus:
    """Get the status of a specific pipeline run.

    Args:
        run_id: ID of the pipeline run for which to get the status.

    Returns:
        The status of the pipeline run.
    """
    return zen_store().get_run(run_id).status
get_run_steps(run_id, _=Security(oauth2_password_bearer_authentication))

Get all steps for a given pipeline run.

Parameters:

Name Type Description Default
run_id UUID

ID of the pipeline run to use to get the DAG.

required

Returns:

Type Description
List[zenml.models.step_run_models.StepRunResponseModel]

The steps for a given pipeline run.

Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
    "/{run_id}" + STEPS,
    response_model=List[StepRunResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_run_steps(
    run_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[StepRunResponseModel]:
    """Get all steps for a given pipeline run.

    Args:
        run_id: ID of the pipeline run to use to get the DAG.

    Returns:
        The steps for a given pipeline run.
    """
    return zen_store().list_run_steps(run_id)
list_runs(project_name_or_id=None, stack_id=None, name=None, user_name_or_id=None, component_id=None, pipeline_id=None, unlisted=False, _=Security(oauth2_password_bearer_authentication))

Get pipeline runs according to query filters.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project for which to filter runs.

None
stack_id Optional[uuid.UUID]

ID of the stack for which to filter runs.

None
name Optional[str]

Filter by run name if provided

None
user_name_or_id Union[str, uuid.UUID]

If provided, only return runs for this user.

None
component_id Optional[uuid.UUID]

Filter by ID of a component that was used in the run.

None
pipeline_id Optional[uuid.UUID]

ID of the pipeline for which to filter runs.

None
unlisted bool

If True, only return unlisted runs that are not associated with any pipeline.

False

Returns:

Type Description
List[zenml.models.pipeline_run_models.PipelineRunResponseModel]

The pipeline runs according to query filters.

Source code in zenml/zen_server/routers/runs_endpoints.py
@router.get(
    "",
    response_model=List[PipelineRunResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_runs(
    project_name_or_id: Optional[Union[str, UUID]] = None,
    stack_id: Optional[UUID] = None,
    name: Optional[str] = None,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    component_id: Optional[UUID] = None,
    pipeline_id: Optional[UUID] = None,
    unlisted: bool = False,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[PipelineRunResponseModel]:
    """Get pipeline runs according to query filters.

    Args:
        project_name_or_id: Name or ID of the project for which to filter runs.
        stack_id: ID of the stack for which to filter runs.
        name: Filter by run name if provided
        user_name_or_id: If provided, only return runs for this user.
        component_id: Filter by ID of a component that was used in the run.
        pipeline_id: ID of the pipeline for which to filter runs.
        unlisted: If True, only return unlisted runs that are not
            associated with any pipeline.

    Returns:
        The pipeline runs according to query filters.
    """
    return zen_store().list_runs(
        project_name_or_id=project_name_or_id,
        name=name,
        stack_id=stack_id,
        component_id=component_id,
        user_name_or_id=user_name_or_id,
        pipeline_id=pipeline_id,
        unlisted=unlisted,
    )
update_run(run_id, run_model, auth_context=Security(oauth2_password_bearer_authentication))

Updates a run.

Parameters:

Name Type Description Default
run_id UUID

ID of the run.

required
run_model PipelineRunUpdateModel

Run model to use for the update.

required
auth_context AuthContext

Authorization Context

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
PipelineRunResponseModel

The updated run model.

Exceptions:

Type Description
IllegalOperationError

When trying to change the user of a run.

Source code in zenml/zen_server/routers/runs_endpoints.py
@router.put(
    "/{run_id}",
    response_model=PipelineRunResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_run(
    run_id: UUID,
    run_model: PipelineRunUpdateModel,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.WRITE]
    ),
) -> PipelineRunResponseModel:
    """Updates a run.

    Args:
        run_id: ID of the run.
        run_model: Run model to use for the update.
        auth_context: Authorization Context

    Returns:
        The updated run model.

    Raises:
        IllegalOperationError: When trying to change the user of a run.
    """
    if run_model.user and run_model.user != auth_context.user.id:
        raise IllegalOperationError(
            "Changing the user of a run to another user is not supported."
        )

    return zen_store().update_run(run_id=run_id, run_update=run_model)

server_endpoints

Endpoint definitions for authentication (login).

server_info()

Get information about the server.

Returns:

Type Description
ServerModel

Information about the server.

Source code in zenml/zen_server/routers/server_endpoints.py
@router.get(
    INFO,
    response_model=ServerModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def server_info() -> ServerModel:
    """Get information about the server.

    Returns:
        Information about the server.
    """
    return zen_store().get_store_info()
version()

Get version of the server.

Returns:

Type Description
str

String representing the version of the server.

Source code in zenml/zen_server/routers/server_endpoints.py
@router.get("/version")
def version() -> str:
    """Get version of the server.

    Returns:
        String representing the version of the server.
    """
    return zenml.__version__

stack_components_endpoints

Endpoint definitions for stack components.

deregister_stack_component(component_id, _=Security(oauth2_password_bearer_authentication))

Deletes a stack component.

Parameters:

Name Type Description Default
component_id UUID

ID of the stack component.

required
Source code in zenml/zen_server/routers/stack_components_endpoints.py
@router.delete(
    "/{component_id}",
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def deregister_stack_component(
    component_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
    """Deletes a stack component.

    Args:
        component_id: ID of the stack component.
    """
    zen_store().delete_stack_component(component_id)
get_stack_component(component_id, _=Security(oauth2_password_bearer_authentication))

Returns the requested stack component.

Parameters:

Name Type Description Default
component_id UUID

ID of the stack component.

required

Returns:

Type Description
ComponentResponseModel

The requested stack component.

Source code in zenml/zen_server/routers/stack_components_endpoints.py
@router.get(
    "/{component_id}",
    response_model=ComponentResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_stack_component(
    component_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> ComponentResponseModel:
    """Returns the requested stack component.

    Args:
        component_id: ID of the stack component.

    Returns:
        The requested stack component.
    """
    return zen_store().get_stack_component(component_id)
get_stack_component_types(_=Security(oauth2_password_bearer_authentication))

Get a list of all stack component types.

Returns:

Type Description
List[str]

List of stack components.

Source code in zenml/zen_server/routers/stack_components_endpoints.py
@types_router.get(
    "",
    response_model=List[str],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_stack_component_types(
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> List[str]:
    """Get a list of all stack component types.

    Returns:
        List of stack components.
    """
    return StackComponentType.values()
list_stack_components(project_name_or_id=None, user_name_or_id=None, type=None, name=None, flavor_name=None, is_shared=None, auth_context=Security(oauth2_password_bearer_authentication))

Get a list of all stack components for a specific type.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project

None
user_name_or_id Union[str, uuid.UUID]

Optionally filter by name or ID of the user.

None
name Optional[str]

Optionally filter by component name

None
type Optional[str]

Optionally filter by component type

None
flavor_name Optional[str]

Optionally filter by flavor

None
is_shared Optional[bool]

Defines whether to return shared stack components or the private stack components of the user. If not set, both are returned.

None
auth_context AuthContext

Authentication Context

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
List[zenml.models.component_models.ComponentResponseModel]

List of stack components for a specific type.

Source code in zenml/zen_server/routers/stack_components_endpoints.py
@router.get(
    "",
    response_model=List[ComponentResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_stack_components(
    project_name_or_id: Optional[Union[str, UUID]] = None,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    type: Optional[str] = None,
    name: Optional[str] = None,
    flavor_name: Optional[str] = None,
    is_shared: Optional[bool] = None,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.READ]
    ),
) -> List[ComponentResponseModel]:
    """Get a list of all stack components for a specific type.

    Args:
        project_name_or_id: Name or ID of the project
        user_name_or_id: Optionally filter by name or ID of the user.
        name: Optionally filter by component name
        type: Optionally filter by component type
        flavor_name: Optionally filter by flavor
        is_shared: Defines whether to return shared stack components or the
            private stack components of the user. If not set, both are returned.
        auth_context: Authentication Context

    Returns:
        List of stack components for a specific type.
    """
    # TODO: Implement a sensible filtering mechanism

    components: List[ComponentResponseModel] = []

    # Get private stack components unless `is_shared` is set to True
    if is_shared is None or not is_shared:
        own_components = zen_store().list_stack_components(
            name=name,
            user_name_or_id=user_name_or_id or auth_context.user.id,
            project_name_or_id=project_name_or_id,
            flavor_name=flavor_name,
            type=type,
            is_shared=False,
        )
        components += own_components

    # Get shared stacks unless `is_shared` is set to False
    if is_shared is None or is_shared:
        shared_components = zen_store().list_stack_components(
            project_name_or_id=project_name_or_id,
            user_name_or_id=user_name_or_id,
            flavor_name=flavor_name,
            name=name,
            type=type,
            is_shared=True,
        )
        components += shared_components

    return components
update_stack_component(component_id, component_update, _=Security(oauth2_password_bearer_authentication))

Updates a stack component.

Parameters:

Name Type Description Default
component_id UUID

ID of the stack component.

required
component_update ComponentUpdateModel

Stack component to use to update.

required

Returns:

Type Description
ComponentResponseModel

Updated stack component.

Source code in zenml/zen_server/routers/stack_components_endpoints.py
@router.put(
    "/{component_id}",
    response_model=ComponentResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_stack_component(
    component_id: UUID,
    component_update: ComponentUpdateModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> ComponentResponseModel:
    """Updates a stack component.

    Args:
        component_id: ID of the stack component.
        component_update: Stack component to use to update.

    Returns:
        Updated stack component.
    """
    return zen_store().update_stack_component(
        component_id=component_id,
        component_update=component_update,
    )

stacks_endpoints

Endpoint definitions for stacks.

delete_stack(stack_id, _=Security(oauth2_password_bearer_authentication))

Deletes a stack.

Parameters:

Name Type Description Default
stack_id UUID

Name of the stack.

required
Source code in zenml/zen_server/routers/stacks_endpoints.py
@router.delete(
    "/{stack_id}",
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_stack(
    stack_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
    """Deletes a stack.

    Args:
        stack_id: Name of the stack.
    """
    zen_store().delete_stack(stack_id)  # aka 'delete_stack'
get_stack(stack_id, _=Security(oauth2_password_bearer_authentication))

Returns the requested stack.

Parameters:

Name Type Description Default
stack_id UUID

ID of the stack.

required

Returns:

Type Description
StackResponseModel

The requested stack.

Source code in zenml/zen_server/routers/stacks_endpoints.py
@router.get(
    "/{stack_id}",
    response_model=StackResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_stack(
    stack_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> StackResponseModel:
    """Returns the requested stack.

    Args:
        stack_id: ID of the stack.

    Returns:
        The requested stack.
    """
    return zen_store().get_stack(stack_id)
list_stacks(project_name_or_id=None, user_name_or_id=None, component_id=None, name=None, is_shared=None, auth_context=Security(oauth2_password_bearer_authentication))

Returns all stacks.

Parameters:

Name Type Description Default
project_name_or_id Union[str, uuid.UUID]

Name or ID of the project

None
user_name_or_id Union[str, uuid.UUID]

Optionally filter by name or ID of the user.

None
component_id Optional[uuid.UUID]

Optionally filter by component that is part of the stack.

None
name Optional[str]

Optionally filter by stack name

None
is_shared Optional[bool]

Defines whether to return shared stacks or the private stacks of the user. If not set, both are returned.

None
auth_context AuthContext

Authentication Context

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
List[zenml.models.stack_models.StackResponseModel]

All stacks.

Source code in zenml/zen_server/routers/stacks_endpoints.py
@router.get(
    "",
    response_model=List[StackResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_stacks(
    project_name_or_id: Optional[Union[str, UUID]] = None,
    user_name_or_id: Optional[Union[str, UUID]] = None,
    component_id: Optional[UUID] = None,
    name: Optional[str] = None,
    is_shared: Optional[bool] = None,
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.READ]
    ),
) -> List[StackResponseModel]:
    """Returns all stacks.

    Args:
        project_name_or_id: Name or ID of the project
        user_name_or_id: Optionally filter by name or ID of the user.
        component_id: Optionally filter by component that is part of the stack.
        name: Optionally filter by stack name
        is_shared: Defines whether to return shared stacks or the private stacks
            of the user. If not set, both are returned.
        auth_context: Authentication Context

    Returns:
        All stacks.
    """
    stacks: List[StackResponseModel] = []

    # Get private stacks unless `is_shared` is set to True
    if is_shared is None or not is_shared:
        own_stacks = zen_store().list_stacks(
            project_name_or_id=project_name_or_id,
            user_name_or_id=user_name_or_id or auth_context.user.id,
            component_id=component_id,
            is_shared=False,
            name=name,
        )
        stacks += own_stacks

    # Get shared stacks unless `is_shared` is set to False
    if is_shared is None or is_shared:
        shared_stacks = zen_store().list_stacks(
            project_name_or_id=project_name_or_id,
            user_name_or_id=user_name_or_id,
            component_id=component_id,
            is_shared=True,
            name=name,
        )
        stacks += shared_stacks

    return stacks
update_stack(stack_id, stack_update, _=Security(oauth2_password_bearer_authentication))

Updates a stack.

Parameters:

Name Type Description Default
stack_id UUID

Name of the stack.

required
stack_update StackUpdateModel

Stack to use for the update.

required

Returns:

Type Description
StackResponseModel

The updated stack.

Source code in zenml/zen_server/routers/stacks_endpoints.py
@router.put(
    "/{stack_id}",
    response_model=StackResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_stack(
    stack_id: UUID,
    stack_update: StackUpdateModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> StackResponseModel:
    """Updates a stack.

    Args:
        stack_id: Name of the stack.
        stack_update: Stack to use for the update.

    Returns:
        The updated stack.
    """
    return zen_store().update_stack(
        stack_id=stack_id,
        stack_update=stack_update,
    )

steps_endpoints

Endpoint definitions for steps (and artifacts) of pipeline runs.

create_run_step(step, _=Security(oauth2_password_bearer_authentication))

Create a run step.

Parameters:

Name Type Description Default
step StepRunRequestModel

The run step to create.

required

Returns:

Type Description
StepRunResponseModel

The created run step.

Source code in zenml/zen_server/routers/steps_endpoints.py
@router.post(
    "",
    response_model=StepRunResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_run_step(
    step: StepRunRequestModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> StepRunResponseModel:
    """Create a run step.

    Args:
        step: The run step to create.

    Returns:
        The created run step.
    """
    return zen_store().create_run_step(step=step)
get_step(step_id, _=Security(oauth2_password_bearer_authentication))

Get one specific step.

Parameters:

Name Type Description Default
step_id UUID

ID of the step to get.

required

Returns:

Type Description
StepRunResponseModel

The step.

Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
    "/{step_id}",
    response_model=StepRunResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step(
    step_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> StepRunResponseModel:
    """Get one specific step.

    Args:
        step_id: ID of the step to get.

    Returns:
        The step.
    """
    return zen_store().get_run_step(step_id)
get_step_configuration(step_id, _=Security(oauth2_password_bearer_authentication))

Get the configuration of a specific step.

Parameters:

Name Type Description Default
step_id UUID

ID of the step to get.

required

Returns:

Type Description
Dict[str, Any]

The step configuration.

Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
    "/{step_id}" + STEP_CONFIGURATION,
    response_model=Dict[str, Any],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step_configuration(
    step_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, Any]:
    """Get the configuration of a specific step.

    Args:
        step_id: ID of the step to get.

    Returns:
        The step configuration.
    """
    return zen_store().get_run_step(step_id).step_configuration
get_step_inputs(step_id, _=Security(oauth2_password_bearer_authentication))

Get the inputs of a specific step.

Parameters:

Name Type Description Default
step_id UUID

ID of the step for which to get the inputs.

required

Returns:

Type Description
Dict[str, zenml.models.artifact_models.ArtifactResponseModel]

All inputs of the step, mapping from input name to artifact model.

Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
    "/{step_id}" + INPUTS,
    response_model=Dict[str, ArtifactResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step_inputs(
    step_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, ArtifactResponseModel]:
    """Get the inputs of a specific step.

    Args:
        step_id: ID of the step for which to get the inputs.

    Returns:
        All inputs of the step, mapping from input name to artifact model.
    """
    return zen_store().get_run_step_inputs(step_id)
get_step_outputs(step_id, _=Security(oauth2_password_bearer_authentication))

Get the outputs of a specific step.

Parameters:

Name Type Description Default
step_id UUID

ID of the step for which to get the outputs.

required

Returns:

Type Description
Dict[str, zenml.models.artifact_models.ArtifactResponseModel]

All outputs of the step, mapping from output name to artifact model.

Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
    "/{step_id}" + OUTPUTS,
    response_model=Dict[str, ArtifactResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step_outputs(
    step_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> Dict[str, ArtifactResponseModel]:
    """Get the outputs of a specific step.

    Args:
        step_id: ID of the step for which to get the outputs.

    Returns:
        All outputs of the step, mapping from output name to artifact model.
    """
    return {
        artifact.name: artifact
        for artifact in zen_store().list_artifacts(parent_step_id=step_id)
    }
get_step_status(step_id, _=Security(oauth2_password_bearer_authentication))

Get the status of a specific step.

Parameters:

Name Type Description Default
step_id UUID

ID of the step for which to get the status.

required

Returns:

Type Description
ExecutionStatus

The status of the step.

Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
    "/{step_id}" + STATUS,
    response_model=ExecutionStatus,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_step_status(
    step_id: UUID,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> ExecutionStatus:
    """Get the status of a specific step.

    Args:
        step_id: ID of the step for which to get the status.

    Returns:
        The status of the step.
    """
    return zen_store().get_run_step(step_id).status
list_run_steps(run_id=None, _=Security(oauth2_password_bearer_authentication))

Get run steps according to query filters.

Parameters:

Name Type Description Default
run_id Optional[uuid.UUID]

The URI of the pipeline run by which to filter.

None

Returns:

Type Description
List[zenml.models.step_run_models.StepRunResponseModel]

The run steps according to query filters.

Source code in zenml/zen_server/routers/steps_endpoints.py
@router.get(
    "",
    response_model=List[StepRunResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_run_steps(
    run_id: Optional[UUID] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[StepRunResponseModel]:
    """Get run steps according to query filters.

    Args:
        run_id: The URI of the pipeline run by which to filter.

    Returns:
        The run steps according to query filters.
    """
    return zen_store().list_run_steps(run_id=run_id)
update_step(step_id, step_model, _=Security(oauth2_password_bearer_authentication))

Updates a step.

Parameters:

Name Type Description Default
step_id UUID

ID of the step.

required
step_model StepRunUpdateModel

Step model to use for the update.

required

Returns:

Type Description
StepRunResponseModel

The updated step model.

Source code in zenml/zen_server/routers/steps_endpoints.py
@router.put(
    "/{step_id}",
    response_model=StepRunResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_step(
    step_id: UUID,
    step_model: StepRunUpdateModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> StepRunResponseModel:
    """Updates a step.

    Args:
        step_id: ID of the step.
        step_model: Step model to use for the update.

    Returns:
        The updated step model.
    """
    return zen_store().update_run_step(step_id=step_id, step_update=step_model)

teams_endpoints

Endpoint definitions for teams and team membership.

create_team(team, _=Security(oauth2_password_bearer_authentication))

Creates a team.

noqa: DAR401

Parameters:

Name Type Description Default
team TeamRequestModel

Team to create.

required

Returns:

Type Description
TeamResponseModel

The created team.

Source code in zenml/zen_server/routers/teams_endpoints.py
@router.post(
    "",
    response_model=TeamResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_team(
    team: TeamRequestModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> TeamResponseModel:
    """Creates a team.

    # noqa: DAR401

    Args:
        team: Team to create.

    Returns:
        The created team.
    """
    return zen_store().create_team(team=team)
delete_team(team_name_or_id, _=Security(oauth2_password_bearer_authentication))

Deletes a specific team.

Parameters:

Name Type Description Default
team_name_or_id Union[str, uuid.UUID]

Name or ID of the team.

required
Source code in zenml/zen_server/routers/teams_endpoints.py
@router.delete(
    "/{team_name_or_id}",
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_team(
    team_name_or_id: Union[str, UUID],
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> None:
    """Deletes a specific team.

    Args:
        team_name_or_id: Name or ID of the team.
    """
    zen_store().delete_team(team_name_or_id=team_name_or_id)
get_role_assignments_for_team(team_name_or_id, project_name_or_id=None, _=Security(oauth2_password_bearer_authentication))

Returns a list of all roles that are assigned to a team.

Parameters:

Name Type Description Default
team_name_or_id Union[str, uuid.UUID]

Name or ID of the team.

required
project_name_or_id Union[str, uuid.UUID]

If provided, only list roles that are limited to the given project.

None

Returns:

Type Description
List[zenml.models.role_assignment_models.RoleAssignmentResponseModel]

A list of all roles that are assigned to a team.

Source code in zenml/zen_server/routers/teams_endpoints.py
@router.get(
    "/{team_name_or_id}" + ROLES,
    response_model=List[RoleAssignmentResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_role_assignments_for_team(
    team_name_or_id: Union[str, UUID],
    project_name_or_id: Optional[Union[str, UUID]] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[RoleAssignmentResponseModel]:
    """Returns a list of all roles that are assigned to a team.

    Args:
        team_name_or_id: Name or ID of the team.
        project_name_or_id: If provided, only list roles that are limited to
            the given project.

    Returns:
        A list of all roles that are assigned to a team.
    """
    return zen_store().list_role_assignments(
        team_name_or_id=team_name_or_id,
        project_name_or_id=project_name_or_id,
    )
get_team(team_name_or_id, _=Security(oauth2_password_bearer_authentication))

Returns a specific team.

Parameters:

Name Type Description Default
team_name_or_id Union[str, uuid.UUID]

Name or ID of the team.

required

Returns:

Type Description
TeamResponseModel

A specific team.

Source code in zenml/zen_server/routers/teams_endpoints.py
@router.get(
    "/{team_name_or_id}",
    response_model=TeamResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_team(
    team_name_or_id: Union[str, UUID],
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> TeamResponseModel:
    """Returns a specific team.

    Args:
        team_name_or_id: Name or ID of the team.

    Returns:
        A specific team.
    """
    return zen_store().get_team(team_name_or_id=team_name_or_id)
list_teams(_=Security(oauth2_password_bearer_authentication))

Returns a list of all teams.

Returns:

Type Description
List[zenml.models.team_models.TeamResponseModel]

List of all teams.

Source code in zenml/zen_server/routers/teams_endpoints.py
@router.get(
    "",
    response_model=List[TeamResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_teams(
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ])
) -> List[TeamResponseModel]:
    """Returns a list of all teams.

    Returns:
        List of all teams.
    """
    return zen_store().list_teams()
update_team(team_id, team_update, _=Security(oauth2_password_bearer_authentication))

Updates a team.

noqa: DAR401

Parameters:

Name Type Description Default
team_id UUID

ID of the team to update.

required
team_update TeamUpdateModel

Team update.

required

Returns:

Type Description
TeamResponseModel

The created team.

Source code in zenml/zen_server/routers/teams_endpoints.py
@router.put(
    "/{team_id}",
    response_model=TeamResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def update_team(
    team_id: UUID,
    team_update: TeamUpdateModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> TeamResponseModel:
    """Updates a team.

    # noqa: DAR401

    Args:
        team_id: ID of the team to update.
        team_update: Team update.

    Returns:
        The created team.
    """
    return zen_store().update_team(team_id=team_id, team_update=team_update)

users_endpoints

Endpoint definitions for users.

activate_user(user_name_or_id, user_update)

Activates a specific user.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

Name or ID of the user.

required
user_update UserUpdateModel

the user to use for the update.

required

Returns:

Type Description
UserResponseModel

The updated user.

Exceptions:

Type Description
HTTPException

If the user is not authorized to activate the user.

Source code in zenml/zen_server/routers/users_endpoints.py
@activation_router.put(
    "/{user_name_or_id}" + ACTIVATE,
    response_model=UserResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def activate_user(
    user_name_or_id: Union[str, UUID],
    user_update: UserUpdateModel,
) -> UserResponseModel:
    """Activates a specific user.

    Args:
        user_name_or_id: Name or ID of the user.
        user_update: the user to use for the update.

    Returns:
        The updated user.

    Raises:
        HTTPException: If the user is not authorized to activate the user.
    """
    user = zen_store().get_user(user_name_or_id)

    auth_context = authenticate_credentials(
        user_name_or_id=user_name_or_id,
        activation_token=user_update.activation_token,
    )
    if auth_context is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
        )
    user_update.active = True
    user_update.activation_token = None
    return zen_store().update_user(user_id=user.id, user_update=user_update)
create_user(user, assign_default_role=True, _=Security(oauth2_password_bearer_authentication))

Creates a user.

noqa: DAR401

Parameters:

Name Type Description Default
user UserRequestModel

User to create.

required
assign_default_role bool

Whether the initial role should be assigned to the new user.

True

Returns:

Type Description
UserResponseModel

The created user.

Source code in zenml/zen_server/routers/users_endpoints.py
@router.post(
    "",
    response_model=UserResponseModel,
    responses={401: error_response, 409: error_response, 422: error_response},
)
@handle_exceptions
def create_user(
    user: UserRequestModel,
    assign_default_role: bool = True,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> UserResponseModel:
    """Creates a user.

    # noqa: DAR401

    Args:
        user: User to create.
        assign_default_role: Whether the initial role should be assigned to the
            new user.

    Returns:
        The created user.
    """
    # Two ways of creating a new user:
    # 1. Create a new user with a password and have it immediately active
    # 2. Create a new user without a password and have it activated at a
    # later time with an activation token

    token: Optional[str] = None
    if user.password is None:
        user.active = False
        token = user.generate_activation_token()
    else:
        user.active = True
    new_user = zen_store().create_user(user)

    # For the time being all users are implicitly assigned the admin role
    if assign_default_role:
        zen_store().create_role_assignment(
            RoleAssignmentRequestModel(
                role=zen_store()._admin_role.id,
                user=new_user.id,
            )
        )

    # add back the original unhashed activation token, if generated, to
    # send it back to the client
    if token:
        new_user.activation_token = token
    return new_user
deactivate_user(user_name_or_id, _=Security(oauth2_password_bearer_authentication))

Deactivates a user and generates a new activation token for it.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

Name or ID of the user.

required

Returns:

Type Description
UserResponseModel

The generated activation token.

Source code in zenml/zen_server/routers/users_endpoints.py
@router.put(
    "/{user_name_or_id}" + DEACTIVATE,
    response_model=UserResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def deactivate_user(
    user_name_or_id: Union[str, UUID],
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> UserResponseModel:
    """Deactivates a user and generates a new activation token for it.

    Args:
        user_name_or_id: Name or ID of the user.

    Returns:
        The generated activation token.
    """
    user = zen_store().get_user(user_name_or_id)

    user_update = UserUpdateModel(active=False)
    token = user_update.generate_activation_token()
    user = zen_store().update_user(user_id=user.id, user_update=user_update)
    # add back the original unhashed activation token
    user.activation_token = token
    return user
delete_user(user_name_or_id, auth_context=Security(oauth2_password_bearer_authentication))

Deletes a specific user.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

Name or ID of the user.

required
auth_context AuthContext

The authentication context.

Security(oauth2_password_bearer_authentication)

Exceptions:

Type Description
IllegalOperationError

If the user is not authorized to delete the user.

Source code in zenml/zen_server/routers/users_endpoints.py
@router.delete(
    "/{user_name_or_id}",
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def delete_user(
    user_name_or_id: Union[str, UUID],
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.WRITE]
    ),
) -> None:
    """Deletes a specific user.

    Args:
        user_name_or_id: Name or ID of the user.
        auth_context: The authentication context.

    Raises:
        IllegalOperationError: If the user is not authorized to delete the user.
    """
    user = zen_store().get_user(user_name_or_id)

    if auth_context.user.name == user.name:
        raise IllegalOperationError(
            "You cannot delete the user account currently used to authenticate "
            "to the ZenML server. If you wish to delete this account, "
            "please authenticate with another account or contact your ZenML "
            "administrator."
        )
    zen_store().delete_user(user_name_or_id=user_name_or_id)
email_opt_in_response(user_name_or_id, user_response, auth_context=Security(oauth2_password_bearer_authentication))

Sets the response of the user to the email prompt.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

Name or ID of the user.

required
user_response UserUpdateModel

User Response to email prompt

required
auth_context AuthContext

The authentication context of the user

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
UserResponseModel

The updated user.

Exceptions:

Type Description
NotAuthorizedError

if the user does not have the required permissions

Source code in zenml/zen_server/routers/users_endpoints.py
@router.put(
    "/{user_name_or_id}" + EMAIL_ANALYTICS,
    response_model=UserResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def email_opt_in_response(
    user_name_or_id: Union[str, UUID],
    user_response: UserUpdateModel,
    auth_context: AuthContext = Security(authorize, scopes=[PermissionType.ME]),
) -> UserResponseModel:
    """Sets the response of the user to the email prompt.

    Args:
        user_name_or_id: Name or ID of the user.
        user_response: User Response to email prompt
        auth_context: The authentication context of the user

    Returns:
        The updated user.

    Raises:
        NotAuthorizedError: if the user does not have the required
            permissions
    """
    user = zen_store().get_user(user_name_or_id)

    if str(auth_context.user.id) == str(user_name_or_id):
        user_update = UserUpdateModel(
            email=user_response.email,
            email_opted_in=user_response.email_opted_in,
        )

        return zen_store().update_user(user_id=user.id, user_update=user_update)
    else:
        raise NotAuthorizedError(
            "Users can not opt in on behalf of another " "user."
        )
get_current_user(auth_context=Security(oauth2_password_bearer_authentication))

Returns the model of the authenticated user.

Parameters:

Name Type Description Default
auth_context AuthContext

The authentication context.

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
UserResponseModel

The model of the authenticated user.

Source code in zenml/zen_server/routers/users_endpoints.py
@current_user_router.get(
    "/current-user",
    response_model=UserResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
# @handle_exceptions
def get_current_user(
    auth_context: AuthContext = Security(
        authorize, scopes=[PermissionType.READ]
    ),
) -> UserResponseModel:
    """Returns the model of the authenticated user.

    Args:
        auth_context: The authentication context.

    Returns:
        The model of the authenticated user.
    """
    return auth_context.user
get_role_assignments_for_user(user_name_or_id, project_name_or_id=None, role_name_or_id=None, _=Security(oauth2_password_bearer_authentication))

Returns a list of all roles that are assigned to a user.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

Name or ID of the user.

required
project_name_or_id Union[str, uuid.UUID]

If provided, only list roles that are limited to the given project.

None
role_name_or_id Union[str, uuid.UUID]

If provided, only list assignments of the given role

None

Returns:

Type Description
List[zenml.models.role_assignment_models.RoleAssignmentResponseModel]

A list of all roles that are assigned to a user.

Source code in zenml/zen_server/routers/users_endpoints.py
@router.get(
    "/{user_name_or_id}" + ROLES,
    response_model=List[RoleAssignmentResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_role_assignments_for_user(
    user_name_or_id: Union[str, UUID],
    project_name_or_id: Optional[Union[str, UUID]] = None,
    role_name_or_id: Optional[Union[str, UUID]] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[RoleAssignmentResponseModel]:
    """Returns a list of all roles that are assigned to a user.

    Args:
        user_name_or_id: Name or ID of the user.
        project_name_or_id: If provided, only list roles that are limited to
            the given project.
        role_name_or_id: If provided, only list assignments of the given
            role

    Returns:
        A list of all roles that are assigned to a user.
    """
    return zen_store().list_role_assignments(
        user_name_or_id=user_name_or_id,
        project_name_or_id=project_name_or_id,
        role_name_or_id=role_name_or_id,
    )
get_user(user_name_or_id, _=Security(oauth2_password_bearer_authentication))

Returns a specific user.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

Name or ID of the user.

required

Returns:

Type Description
UserResponseModel

A specific user.

Source code in zenml/zen_server/routers/users_endpoints.py
@router.get(
    "/{user_name_or_id}",
    response_model=UserResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def get_user(
    user_name_or_id: Union[str, UUID],
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> UserResponseModel:
    """Returns a specific user.

    Args:
        user_name_or_id: Name or ID of the user.

    Returns:
        A specific user.
    """
    return zen_store().get_user(user_name_or_id=user_name_or_id)
list_users(name=None, _=Security(oauth2_password_bearer_authentication))

Returns a list of all users.

Parameters:

Name Type Description Default
name Optional[str]

Optionally filter by name

None

Returns:

Type Description
List[zenml.models.user_models.UserResponseModel]

A list of all users.

Source code in zenml/zen_server/routers/users_endpoints.py
@router.get(
    "",
    response_model=List[UserResponseModel],
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def list_users(
    name: Optional[str] = None,
    _: AuthContext = Security(authorize, scopes=[PermissionType.READ]),
) -> List[UserResponseModel]:
    """Returns a list of all users.

    Args:
        name: Optionally filter by name

    Returns:
        A list of all users.
    """
    return zen_store().list_users(name=name)
update_myself(user, auth_context=Security(oauth2_password_bearer_authentication))

Updates a specific user.

Parameters:

Name Type Description Default
user UserUpdateModel

the user to use for the update.

required
auth_context AuthContext

The authentication context.

Security(oauth2_password_bearer_authentication)

Returns:

Type Description
UserResponseModel

The updated user.

Source code in zenml/zen_server/routers/users_endpoints.py
@current_user_router.put(
    "/current-user",
    response_model=UserResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_myself(
    user: UserUpdateModel,
    auth_context: AuthContext = Security(authorize, scopes=[PermissionType.ME]),
) -> UserResponseModel:
    """Updates a specific user.

    Args:
        user: the user to use for the update.
        auth_context: The authentication context.

    Returns:
        The updated user.
    """
    return zen_store().update_user(
        user_id=auth_context.user.id, user_update=user
    )
update_user(user_name_or_id, user_update, _=Security(oauth2_password_bearer_authentication))

Updates a specific user.

Parameters:

Name Type Description Default
user_name_or_id Union[str, uuid.UUID]

Name or ID of the user.

required
user_update UserUpdateModel

the user to use for the update.

required

Returns:

Type Description
UserResponseModel

The updated user.

Source code in zenml/zen_server/routers/users_endpoints.py
@router.put(
    "/{user_name_or_id}",
    response_model=UserResponseModel,
    responses={401: error_response, 404: error_response, 422: error_response},
)
@handle_exceptions
def update_user(
    user_name_or_id: Union[str, UUID],
    user_update: UserUpdateModel,
    _: AuthContext = Security(authorize, scopes=[PermissionType.WRITE]),
) -> UserResponseModel:
    """Updates a specific user.

    Args:
        user_name_or_id: Name or ID of the user.
        user_update: the user to use for the update.

    Returns:
        The updated user.
    """
    user = zen_store().get_user(user_name_or_id)

    return zen_store().update_user(
        user_id=user.id,
        user_update=user_update,
    )

utils

Util functions for the ZenML Server.

ErrorModel (BaseModel) pydantic-model

Base class for error responses.

Source code in zenml/zen_server/utils.py
class ErrorModel(BaseModel):
    """Base class for error responses."""

    detail: Any

conflict(error)

Convert an Exception to a HTTP 409 response.

Parameters:

Name Type Description Default
error Exception

Exception to convert.

required

Returns:

Type Description
HTTPException

HTTPException with status code 409.

Source code in zenml/zen_server/utils.py
def conflict(error: Exception) -> HTTPException:
    """Convert an Exception to a HTTP 409 response.

    Args:
        error: Exception to convert.

    Returns:
        HTTPException with status code 409.
    """
    return HTTPException(status_code=409, detail=error_detail(error))

error_detail(error)

Convert an Exception to API representation.

Parameters:

Name Type Description Default
error Exception

Exception to convert.

required

Returns:

Type Description
List[str]

List of strings representing the error.

Source code in zenml/zen_server/utils.py
def error_detail(error: Exception) -> List[str]:
    """Convert an Exception to API representation.

    Args:
        error: Exception to convert.

    Returns:
        List of strings representing the error.
    """
    return [type(error).__name__] + [str(a) for a in error.args]

forbidden(error)

Convert an Exception to a HTTP 403 response.

Parameters:

Name Type Description Default
error Exception

Exception to convert.

required

Returns:

Type Description
HTTPException

HTTPException with status code 403.

Source code in zenml/zen_server/utils.py
def forbidden(error: Exception) -> HTTPException:
    """Convert an Exception to a HTTP 403 response.

    Args:
        error: Exception to convert.

    Returns:
        HTTPException with status code 403.
    """
    return HTTPException(status_code=403, detail=error_detail(error))

handle_exceptions(func)

Decorator to handle exceptions in the API.

Parameters:

Name Type Description Default
func ~F

Function to decorate.

required

Returns:

Type Description
~F

Decorated function.

Source code in zenml/zen_server/utils.py
def handle_exceptions(func: F) -> F:
    """Decorator to handle exceptions in the API.

    Args:
        func: Function to decorate.

    Returns:
        Decorated function.
    """

    @wraps(func)
    def decorated(*args: Any, **kwargs: Any) -> Any:
        try:
            return func(*args, **kwargs)
        except NotAuthorizedError as error:
            logger.exception("Authorization error")
            raise not_authorized(error) from error
        except KeyError as error:
            logger.exception("Entity not found")
            raise not_found(error) from error
        except (
            StackExistsError,
            StackComponentExistsError,
            EntityExistsError,
        ) as error:
            logger.exception("Entity already exists")
            raise conflict(error) from error
        except IllegalOperationError as error:
            logger.exception("Illegal operation")
            raise forbidden(error) from error
        except ValueError as error:
            logger.exception("Validation error")
            raise unprocessable(error) from error

    return cast(F, decorated)

initialize_zen_store()

Initialize the ZenML Store.

Exceptions:

Type Description
ValueError

If the ZenML Store is using a REST back-end.

Source code in zenml/zen_server/utils.py
def initialize_zen_store() -> None:
    """Initialize the ZenML Store.

    Raises:
        ValueError: If the ZenML Store is using a REST back-end.
    """
    global _zen_store

    logger.debug("Initializing ZenML Store for FastAPI...")
    _zen_store = GlobalConfiguration().zen_store

    # We override track_analytics=False because we do not
    # want to track anything server side.
    _zen_store.track_analytics = False

    if _zen_store.type == StoreType.REST:
        raise ValueError(
            "Server cannot be started with a REST store type. Make sure you "
            "configure ZenML to use a non-networked store backend "
            "when trying to start the ZenML Server."
        )

not_authorized(error)

Convert an Exception to a HTTP 401 response.

Parameters:

Name Type Description Default
error Exception

Exception to convert.

required

Returns:

Type Description
HTTPException

HTTPException with status code 401.

Source code in zenml/zen_server/utils.py
def not_authorized(error: Exception) -> HTTPException:
    """Convert an Exception to a HTTP 401 response.

    Args:
        error: Exception to convert.

    Returns:
        HTTPException with status code 401.
    """
    return HTTPException(status_code=401, detail=error_detail(error))

not_found(error)

Convert an Exception to a HTTP 404 response.

Parameters:

Name Type Description Default
error Exception

Exception to convert.

required

Returns:

Type Description
HTTPException

HTTPException with status code 404.

Source code in zenml/zen_server/utils.py
def not_found(error: Exception) -> HTTPException:
    """Convert an Exception to a HTTP 404 response.

    Args:
        error: Exception to convert.

    Returns:
        HTTPException with status code 404.
    """
    return HTTPException(status_code=404, detail=error_detail(error))

unprocessable(error)

Convert an Exception to a HTTP 409 response.

Parameters:

Name Type Description Default
error Exception

Exception to convert.

required

Returns:

Type Description
HTTPException

HTTPException with status code 422.

Source code in zenml/zen_server/utils.py
def unprocessable(error: Exception) -> HTTPException:
    """Convert an Exception to a HTTP 409 response.

    Args:
        error: Exception to convert.

    Returns:
        HTTPException with status code 422.
    """
    return HTTPException(status_code=422, detail=error_detail(error))

zen_store()

Initialize the ZenML Store.

Returns:

Type Description
BaseZenStore

The ZenML Store.

Exceptions:

Type Description
RuntimeError

If the ZenML Store has not been initialized.

Source code in zenml/zen_server/utils.py
def zen_store() -> BaseZenStore:
    """Initialize the ZenML Store.

    Returns:
        The ZenML Store.

    Raises:
        RuntimeError: If the ZenML Store has not been initialized.
    """
    global _zen_store
    if _zen_store is None:
        raise RuntimeError("ZenML Store not initialized")
    return _zen_store

zen_server_api

Zen Server API.

catch_all(request, file_path)

Dashboard endpoint.

Parameters:

Name Type Description Default
request Request

Request object.

required
file_path str

Path to a file in the dashboard root folder.

required

Returns:

Type Description
Any

The ZenML dashboard.

Exceptions:

Type Description
HTTPException

404 error if requested a non-existent static file or if the dashboard files are not included.

Source code in zenml/zen_server/zen_server_api.py
@app.get("/{file_path:path}", include_in_schema=False)
def catch_all(request: Request, file_path: str) -> Any:
    """Dashboard endpoint.

    Args:
        request: Request object.
        file_path: Path to a file in the dashboard root folder.

    Returns:
        The ZenML dashboard.

    Raises:
        HTTPException: 404 error if requested a non-existent static file or if
            the dashboard files are not included.
    """
    # some static files need to be served directly from the root dashboard
    # directory
    if file_path and file_path in root_static_files:
        logger.debug(f"Returning static file: {file_path}")
        full_path = os.path.join(relative_path(DASHBOARD_DIRECTORY), file_path)
        return FileResponse(full_path)

    tokens = file_path.split("/")
    if len(tokens) == 1 and not request.query_params:
        logger.debug(f"Requested non-existent static file: {file_path}")
        raise HTTPException(status_code=404)

    if not os.path.isfile(
        os.path.join(relative_path(DASHBOARD_DIRECTORY), "index.html")
    ):
        raise HTTPException(status_code=404)

    # everything else is directed to the index.html file that hosts the
    # single-page application
    return templates.TemplateResponse("index.html", {"request": request})

dashboard(request)

Dashboard endpoint.

Parameters:

Name Type Description Default
request Request

Request object.

required

Returns:

Type Description
Any

The ZenML dashboard.

Exceptions:

Type Description
HTTPException

If the dashboard files are not included.

Source code in zenml/zen_server/zen_server_api.py
@app.get("/", include_in_schema=False)
def dashboard(request: Request) -> Any:
    """Dashboard endpoint.

    Args:
        request: Request object.

    Returns:
        The ZenML dashboard.

    Raises:
        HTTPException: If the dashboard files are not included.
    """
    if not os.path.isfile(
        os.path.join(relative_path(DASHBOARD_DIRECTORY), "index.html")
    ):
        raise HTTPException(status_code=404)
    return templates.TemplateResponse("index.html", {"request": request})

get_root_static_files()

Get the list of static files in the root dashboard directory.

These files are static files that are not in the /static subdirectory that need to be served as static files under the root URL path.

Returns:

Type Description
List[str]

List of static files in the root directory.

Source code in zenml/zen_server/zen_server_api.py
def get_root_static_files() -> List[str]:
    """Get the list of static files in the root dashboard directory.

    These files are static files that are not in the /static subdirectory
    that need to be served as static files under the root URL path.

    Returns:
        List of static files in the root directory.
    """
    root_path = relative_path(DASHBOARD_DIRECTORY)
    if not os.path.isdir(root_path):
        return []
    files = []
    for file in os.listdir(root_path):
        if file == "index.html":
            # this is served separately
            continue
        if isfile(os.path.join(root_path, file)):
            files.append(file)
    return files

health()

Get health status of the server.

Returns:

Type Description
str

String representing the health status of the server.

Source code in zenml/zen_server/zen_server_api.py
@app.head(HEALTH, include_in_schema=False)
@app.get(HEALTH)
def health() -> str:
    """Get health status of the server.

    Returns:
        String representing the health status of the server.
    """
    return "OK"

initialize()

Initialize the ZenML server.

Source code in zenml/zen_server/zen_server_api.py
@app.on_event("startup")
def initialize() -> None:
    """Initialize the ZenML server."""
    # IMPORTANT: this needs to be done before the fastapi app starts, to avoid
    # race conditions
    initialize_zen_store()
    sync_pipeline_runs_on_schedule()

invalid_api(invalid_api_path)

Invalid API endpoint.

All API endpoints that are not defined in the API routers will be redirected to this endpoint and will return a 404 error.

Parameters:

Name Type Description Default
invalid_api_path str

Invalid API path.

required

Exceptions:

Type Description
HTTPException

404 error.

Source code in zenml/zen_server/zen_server_api.py
@app.get(
    API + "/{invalid_api_path:path}", status_code=404, include_in_schema=False
)
def invalid_api(invalid_api_path: str) -> None:
    """Invalid API endpoint.

    All API endpoints that are not defined in the API routers will be
    redirected to this endpoint and will return a 404 error.

    Args:
        invalid_api_path: Invalid API path.

    Raises:
        HTTPException: 404 error.
    """
    logger.debug(f"Invalid API path requested: {invalid_api_path}")
    raise HTTPException(status_code=404)

relative_path(rel)

Get the absolute path of a path relative to the ZenML server module.

Parameters:

Name Type Description Default
rel str

Relative path.

required

Returns:

Type Description
str

Absolute path.

Source code in zenml/zen_server/zen_server_api.py
def relative_path(rel: str) -> str:
    """Get the absolute path of a path relative to the ZenML server module.

    Args:
        rel: Relative path.

    Returns:
        Absolute path.
    """
    return os.path.join(os.path.dirname(__file__), rel)

sync_pipeline_runs_on_schedule()

Sync pipeline runs.

Source code in zenml/zen_server/zen_server_api.py
@app.on_event("startup")
@repeat_every(
    seconds=float(os.getenv(ENV_ZENML_SERVER_METADATA_SYNC_PERIOD, 30)),
    wait_first=True,
)
def sync_pipeline_runs_on_schedule() -> None:
    """Sync pipeline runs."""
    logger.info("Syncing pipeline runs in server schedule...")
    try:
        zen_store()._sync_runs()
    except Exception:
        logger.exception("Failed to sync pipeline runs.")