Zen Stores
zenml.zen_stores
special
ZenStores define ways to store ZenML relevant data locally or remotely.
base_zen_store
Base Zen Store implementation.
BaseZenStore (BaseModel, ZenStoreInterface, AnalyticsTrackerMixin, ABC)
pydantic-model
Base class for accessing and persisting ZenML core objects.
Attributes:
Name | Type | Description |
---|---|---|
config |
StoreConfiguration |
The configuration of the store. |
track_analytics |
bool |
Only send analytics if set to |
Source code in zenml/zen_stores/base_zen_store.py
class BaseZenStore(BaseModel, ZenStoreInterface, AnalyticsTrackerMixin, ABC):
"""Base class for accessing and persisting ZenML core objects.
Attributes:
config: The configuration of the store.
track_analytics: Only send analytics if set to `True`.
"""
config: StoreConfiguration
track_analytics: bool = True
TYPE: ClassVar[StoreType]
CONFIG_TYPE: ClassVar[Type[StoreConfiguration]]
# ---------------------------------
# Initialization and configuration
# ---------------------------------
def __init__(
self,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> None:
"""Create and initialize a store.
Args:
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the Pydantic
constructor.
Raises:
RuntimeError: If the store cannot be initialized.
"""
super().__init__(**kwargs)
try:
self._initialize()
except Exception as e:
raise RuntimeError(
f"Error initializing {self.type.value} store with URL "
f"'{self.url}': {str(e)}"
) from e
if not skip_default_registrations:
logger.debug("Initializing database")
self._initialize_database()
else:
logger.debug("Skipping database initialization")
@staticmethod
def get_store_class(store_type: StoreType) -> Type["BaseZenStore"]:
"""Returns the class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The class of the given store type or None if the type is unknown.
Raises:
TypeError: If the store type is unsupported.
"""
if store_type == StoreType.SQL:
from zenml.zen_stores.sql_zen_store import SqlZenStore
return SqlZenStore
elif store_type == StoreType.REST:
from zenml.zen_stores.rest_zen_store import RestZenStore
return RestZenStore
else:
raise TypeError(
f"No store implementation found for store type "
f"`{store_type.value}`."
)
@staticmethod
def get_store_config_class(
store_type: StoreType,
) -> Type["StoreConfiguration"]:
"""Returns the store config class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The config class of the given store type.
"""
store_class = BaseZenStore.get_store_class(store_type)
return store_class.CONFIG_TYPE
@staticmethod
def get_store_type(url: str) -> StoreType:
"""Returns the store type associated with a URL schema.
Args:
url: The store URL.
Returns:
The store type associated with the supplied URL schema.
Raises:
TypeError: If no store type was found to support the supplied URL.
"""
from zenml.zen_stores.rest_zen_store import RestZenStoreConfiguration
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
if SqlZenStoreConfiguration.supports_url_scheme(url):
return StoreType.SQL
elif RestZenStoreConfiguration.supports_url_scheme(url):
return StoreType.REST
else:
raise TypeError(f"No store implementation found for URL: {url}.")
@staticmethod
def create_store(
config: StoreConfiguration,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> "BaseZenStore":
"""Create and initialize a store from a store configuration.
Args:
config: The store configuration to use.
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the store class
Returns:
The initialized store.
"""
logger.debug(f"Creating store with config '{config}'...")
store_class = BaseZenStore.get_store_class(config.type)
store = store_class(
config=config,
skip_default_registrations=skip_default_registrations,
**kwargs,
)
return store
@staticmethod
def get_default_store_config(path: str) -> StoreConfiguration:
"""Get the default store configuration.
The default store is a SQLite store that saves the DB contents on the
local filesystem.
Args:
path: The local path where the store DB will be stored.
Returns:
The default store configuration.
"""
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
config = SqlZenStoreConfiguration(
type=StoreType.SQL,
url=SqlZenStoreConfiguration.get_local_url(path),
)
return config
def _initialize_database(self) -> None:
"""Initialize the database on first use."""
try:
default_project = self._default_project
except KeyError:
default_project = self._create_default_project()
try:
assert self._admin_role
except KeyError:
self._create_admin_role()
try:
assert self._guest_role
except KeyError:
self._create_guest_role()
try:
default_user = self._default_user
except KeyError:
default_user = self._create_default_user()
try:
self._get_default_stack(
project_name_or_id=default_project.id,
user_name_or_id=default_user.id,
)
except KeyError:
self._create_default_stack(
project_name_or_id=default_project.id,
user_name_or_id=default_user.id,
)
@property
def url(self) -> str:
"""The URL of the store.
Returns:
The URL of the store.
"""
return self.config.url
@property
def type(self) -> StoreType:
"""The type of the store.
Returns:
The type of the store.
"""
return self.TYPE
def validate_active_config(
self,
active_project_name_or_id: Optional[Union[str, UUID]] = None,
active_stack_id: Optional[UUID] = None,
config_name: str = "",
) -> Tuple[ProjectResponseModel, StackResponseModel]:
"""Validate the active configuration.
Call this method to validate the supplied active project and active
stack values.
This method is guaranteed to return valid project ID and stack ID
values. If the supplied project and stack are not set or are not valid
(e.g. they do not exist or are not accessible), the default project and
default project stack will be returned in their stead.
Args:
active_project_name_or_id: The name or ID of the active project.
active_stack_id: The ID of the active stack.
config_name: The name of the configuration to validate (used in the
displayed logs/messages).
Returns:
A tuple containing the active project and active stack.
"""
active_project: ProjectResponseModel
if active_project_name_or_id:
try:
active_project = self.get_project(active_project_name_or_id)
except KeyError:
active_project = self._get_or_create_default_project()
logger.warning(
f"The current {config_name} active project is no longer "
f"available. Resetting the active project to "
f"'{active_project.name}'."
)
else:
active_project = self._get_or_create_default_project()
logger.info(
f"Setting the {config_name} active project "
f"to '{active_project.name}'."
)
active_stack: StackResponseModel
# Sanitize the active stack
if active_stack_id:
# Ensure that the active stack is still valid
try:
active_stack = self.get_stack(stack_id=active_stack_id)
except KeyError:
logger.warning(
"The current %s active stack is no longer available. "
"Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_project
)
else:
if active_stack.project.id != active_project.id:
logger.warning(
"The current %s active stack is not part of the active "
"project. Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_project
)
elif not active_stack.is_shared and (
not active_stack.user
or (active_stack.user.id != self.get_user().id)
):
logger.warning(
"The current %s active stack is not shared and not "
"owned by the active user. "
"Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_project
)
else:
logger.warning(
"Setting the %s active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(active_project)
return active_project, active_stack
def get_store_info(self) -> ServerModel:
"""Get information about the store.
Returns:
Information about the store.
"""
return ServerModel(
id=GlobalConfiguration().user_id,
version=zenml.__version__,
deployment_type=os.environ.get(
ENV_ZENML_SERVER_DEPLOYMENT_TYPE, ServerDeploymentType.OTHER
),
database_type=ServerDatabaseType.OTHER,
)
def is_local_store(self) -> bool:
"""Check if the store is local or connected to a local ZenML server.
Returns:
True if the store is local, False otherwise.
"""
return self.get_store_info().is_local()
def _get_or_create_default_stack(
self, project: "ProjectResponseModel"
) -> "StackResponseModel":
try:
return self._get_default_stack(
project_name_or_id=project.id,
user_name_or_id=self.get_user().id,
)
except KeyError:
return self._create_default_stack( # type: ignore[no-any-return]
project_name_or_id=project.id,
user_name_or_id=self.get_user().id,
)
def _get_or_create_default_project(self) -> "ProjectResponseModel":
try:
return self._default_project
except KeyError:
return self._create_default_project() # type: ignore[no-any-return]
# ------
# Stacks
# ------
@track(AnalyticsEvent.REGISTERED_DEFAULT_STACK)
def _create_default_stack(
self,
project_name_or_id: Union[str, UUID],
user_name_or_id: Union[str, UUID],
) -> StackResponseModel:
"""Create the default stack components and stack.
The default stack contains a local orchestrator and a local artifact
store.
Args:
project_name_or_id: Name or ID of the project to which the stack
belongs.
user_name_or_id: The name or ID of the user that owns the stack.
Returns:
The model of the created default stack.
"""
project = self.get_project(project_name_or_id=project_name_or_id)
user = self.get_user(user_name_or_id=user_name_or_id)
logger.info(
f"Creating default stack for user '{user.name}' in project "
f"{project.name}..."
)
# Register the default orchestrator
orchestrator = self.create_stack_component(
component=ComponentRequestModel(
user=user.id,
project=project.id,
name=DEFAULT_STACK_COMPONENT_NAME,
type=StackComponentType.ORCHESTRATOR,
flavor="local",
configuration={},
),
)
# Register the default artifact store
artifact_store = self.create_stack_component(
component=ComponentRequestModel(
user=user.id,
project=project.id,
name=DEFAULT_STACK_COMPONENT_NAME,
type=StackComponentType.ARTIFACT_STORE,
flavor="local",
configuration={},
),
)
components = {c.type: [c.id] for c in [orchestrator, artifact_store]}
# Register the default stack
stack = StackRequestModel(
name=DEFAULT_STACK_NAME,
components=components,
is_shared=False,
project=project.id,
user=user.id,
)
return self.create_stack(stack=stack)
def _get_default_stack(
self,
project_name_or_id: Union[str, UUID],
user_name_or_id: Union[str, UUID],
) -> StackResponseModel:
"""Get the default stack for a user in a project.
Args:
project_name_or_id: Name or ID of the project.
user_name_or_id: Name or ID of the user.
Returns:
The default stack in the project owned by the supplied user.
Raises:
KeyError: if the project or default stack doesn't exist.
"""
default_stacks = self.list_stacks(
StackFilterModel(
project_id=project_name_or_id,
user_id=user_name_or_id,
name=DEFAULT_STACK_NAME,
)
)
if default_stacks.total == 0:
raise KeyError(
f"No default stack found for user {str(user_name_or_id)} in "
f"project {str(project_name_or_id)}"
)
return default_stacks.items[0]
# -----
# Roles
# -----
@property
def _admin_role(self) -> RoleResponseModel:
"""Get the admin role.
Returns:
The default admin role.
"""
return self.get_role(DEFAULT_ADMIN_ROLE)
@track(AnalyticsEvent.CREATED_DEFAULT_ROLES)
def _create_admin_role(self) -> RoleResponseModel:
"""Creates the admin role.
Returns:
The admin role
"""
logger.info(f"Creating '{DEFAULT_ADMIN_ROLE}' role ...")
return self.create_role(
RoleRequestModel(
name=DEFAULT_ADMIN_ROLE,
permissions={
PermissionType.READ.value,
PermissionType.WRITE.value,
PermissionType.ME.value,
},
)
)
@property
def _guest_role(self) -> RoleResponseModel:
"""Get the guest role.
Returns:
The guest role.
"""
return self.get_role(DEFAULT_GUEST_ROLE)
@track(AnalyticsEvent.CREATED_DEFAULT_ROLES)
def _create_guest_role(self) -> RoleResponseModel:
"""Creates the guest role.
Returns:
The guest role
"""
logger.info(f"Creating '{DEFAULT_GUEST_ROLE}' role ...")
return self.create_role(
RoleRequestModel(
name=DEFAULT_GUEST_ROLE,
permissions={
PermissionType.READ.value,
PermissionType.ME.value,
},
)
)
# -----
# Users
# -----
@property
def _default_user_name(self) -> str:
"""Get the default user name.
Returns:
The default user name.
"""
return os.getenv(ENV_ZENML_DEFAULT_USER_NAME, DEFAULT_USERNAME)
@property
def _default_user(self) -> UserResponseModel:
"""Get the default user.
Returns:
The default user.
Raises:
KeyError: If the default user doesn't exist.
"""
user_name = self._default_user_name
try:
return self.get_user(user_name)
except KeyError:
raise KeyError(f"The default user '{user_name}' is not configured")
@track(AnalyticsEvent.CREATED_DEFAULT_USER)
def _create_default_user(self) -> UserResponseModel:
"""Creates a default user with the admin role.
Returns:
The default user.
"""
user_name = os.getenv(ENV_ZENML_DEFAULT_USER_NAME, DEFAULT_USERNAME)
user_password = os.getenv(
ENV_ZENML_DEFAULT_USER_PASSWORD, DEFAULT_PASSWORD
)
logger.info(f"Creating default user '{user_name}' ...")
new_user = self.create_user(
UserRequestModel(
name=user_name,
active=True,
password=user_password,
)
)
self.create_user_role_assignment(
UserRoleAssignmentRequestModel(
role=self._admin_role.id,
user=new_user.id,
project=None,
)
)
return new_user
# -----
# Roles
# -----
@property
def roles(self) -> Page[RoleResponseModel]:
"""All existing roles.
Returns:
A list of all existing roles.
"""
return self.list_roles(RoleFilterModel())
# --------
# Projects
# --------
@property
def _default_project_name(self) -> str:
"""Get the default project name.
Returns:
The default project name.
"""
return os.getenv(ENV_ZENML_DEFAULT_PROJECT_NAME, DEFAULT_PROJECT_NAME)
@property
def _default_project(self) -> ProjectResponseModel:
"""Get the default project.
Returns:
The default project.
Raises:
KeyError: if the default project doesn't exist.
"""
project_name = self._default_project_name
try:
return self.get_project(project_name)
except KeyError:
raise KeyError(
f"The default project '{project_name}' is not configured"
)
@track(AnalyticsEvent.CREATED_DEFAULT_PROJECT)
def _create_default_project(self) -> ProjectResponseModel:
"""Creates a default project.
Returns:
The default project.
"""
project_name = self._default_project_name
logger.info(f"Creating default project '{project_name}' ...")
return self.create_project(ProjectRequestModel(name=project_name))
# ---------
# Analytics
# ---------
def track_event(
self,
event: Union[str, AnalyticsEvent],
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Track an analytics event.
Args:
event: The event to track.
metadata: Additional metadata to track with the event.
"""
if self.track_analytics:
# Server information is always tracked, if available.
track_event(event, metadata)
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Ignore extra attributes from configs of previous ZenML versions
extra = "ignore"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
roles: Page[RoleResponseModel]
property
readonly
All existing roles.
Returns:
Type | Description |
---|---|
Page[RoleResponseModel] |
A list of all existing roles. |
type: StoreType
property
readonly
The type of the store.
Returns:
Type | Description |
---|---|
StoreType |
The type of the store. |
url: str
property
readonly
The URL of the store.
Returns:
Type | Description |
---|---|
str |
The URL of the store. |
Config
Pydantic configuration class.
Source code in zenml/zen_stores/base_zen_store.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Ignore extra attributes from configs of previous ZenML versions
extra = "ignore"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
__init__(self, skip_default_registrations=False, **kwargs)
special
Create and initialize a store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
skip_default_registrations |
bool |
If |
False |
**kwargs |
Any |
Additional keyword arguments to pass to the Pydantic constructor. |
{} |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the store cannot be initialized. |
Source code in zenml/zen_stores/base_zen_store.py
def __init__(
self,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> None:
"""Create and initialize a store.
Args:
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the Pydantic
constructor.
Raises:
RuntimeError: If the store cannot be initialized.
"""
super().__init__(**kwargs)
try:
self._initialize()
except Exception as e:
raise RuntimeError(
f"Error initializing {self.type.value} store with URL "
f"'{self.url}': {str(e)}"
) from e
if not skip_default_registrations:
logger.debug("Initializing database")
self._initialize_database()
else:
logger.debug("Skipping database initialization")
create_store(config, skip_default_registrations=False, **kwargs)
staticmethod
Create and initialize a store from a store configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
StoreConfiguration |
The store configuration to use. |
required |
skip_default_registrations |
bool |
If |
False |
**kwargs |
Any |
Additional keyword arguments to pass to the store class |
{} |
Returns:
Type | Description |
---|---|
BaseZenStore |
The initialized store. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def create_store(
config: StoreConfiguration,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> "BaseZenStore":
"""Create and initialize a store from a store configuration.
Args:
config: The store configuration to use.
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the store class
Returns:
The initialized store.
"""
logger.debug(f"Creating store with config '{config}'...")
store_class = BaseZenStore.get_store_class(config.type)
store = store_class(
config=config,
skip_default_registrations=skip_default_registrations,
**kwargs,
)
return store
get_default_store_config(path)
staticmethod
Get the default store configuration.
The default store is a SQLite store that saves the DB contents on the local filesystem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The local path where the store DB will be stored. |
required |
Returns:
Type | Description |
---|---|
StoreConfiguration |
The default store configuration. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_default_store_config(path: str) -> StoreConfiguration:
"""Get the default store configuration.
The default store is a SQLite store that saves the DB contents on the
local filesystem.
Args:
path: The local path where the store DB will be stored.
Returns:
The default store configuration.
"""
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
config = SqlZenStoreConfiguration(
type=StoreType.SQL,
url=SqlZenStoreConfiguration.get_local_url(path),
)
return config
get_store_class(store_type)
staticmethod
Returns the class of the given store type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store_type |
StoreType |
The type of the store to get the class for. |
required |
Returns:
Type | Description |
---|---|
Type[BaseZenStore] |
The class of the given store type or None if the type is unknown. |
Exceptions:
Type | Description |
---|---|
TypeError |
If the store type is unsupported. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_class(store_type: StoreType) -> Type["BaseZenStore"]:
"""Returns the class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The class of the given store type or None if the type is unknown.
Raises:
TypeError: If the store type is unsupported.
"""
if store_type == StoreType.SQL:
from zenml.zen_stores.sql_zen_store import SqlZenStore
return SqlZenStore
elif store_type == StoreType.REST:
from zenml.zen_stores.rest_zen_store import RestZenStore
return RestZenStore
else:
raise TypeError(
f"No store implementation found for store type "
f"`{store_type.value}`."
)
get_store_config_class(store_type)
staticmethod
Returns the store config class of the given store type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store_type |
StoreType |
The type of the store to get the class for. |
required |
Returns:
Type | Description |
---|---|
Type[StoreConfiguration] |
The config class of the given store type. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_config_class(
store_type: StoreType,
) -> Type["StoreConfiguration"]:
"""Returns the store config class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The config class of the given store type.
"""
store_class = BaseZenStore.get_store_class(store_type)
return store_class.CONFIG_TYPE
get_store_info(self)
Get information about the store.
Returns:
Type | Description |
---|---|
ServerModel |
Information about the store. |
Source code in zenml/zen_stores/base_zen_store.py
def get_store_info(self) -> ServerModel:
"""Get information about the store.
Returns:
Information about the store.
"""
return ServerModel(
id=GlobalConfiguration().user_id,
version=zenml.__version__,
deployment_type=os.environ.get(
ENV_ZENML_SERVER_DEPLOYMENT_TYPE, ServerDeploymentType.OTHER
),
database_type=ServerDatabaseType.OTHER,
)
get_store_type(url)
staticmethod
Returns the store type associated with a URL schema.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The store URL. |
required |
Returns:
Type | Description |
---|---|
StoreType |
The store type associated with the supplied URL schema. |
Exceptions:
Type | Description |
---|---|
TypeError |
If no store type was found to support the supplied URL. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_type(url: str) -> StoreType:
"""Returns the store type associated with a URL schema.
Args:
url: The store URL.
Returns:
The store type associated with the supplied URL schema.
Raises:
TypeError: If no store type was found to support the supplied URL.
"""
from zenml.zen_stores.rest_zen_store import RestZenStoreConfiguration
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
if SqlZenStoreConfiguration.supports_url_scheme(url):
return StoreType.SQL
elif RestZenStoreConfiguration.supports_url_scheme(url):
return StoreType.REST
else:
raise TypeError(f"No store implementation found for URL: {url}.")
is_local_store(self)
Check if the store is local or connected to a local ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if the store is local, False otherwise. |
Source code in zenml/zen_stores/base_zen_store.py
def is_local_store(self) -> bool:
"""Check if the store is local or connected to a local ZenML server.
Returns:
True if the store is local, False otherwise.
"""
return self.get_store_info().is_local()
track_event(self, event, metadata=None)
Track an analytics event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Union[str, zenml.utils.analytics_utils.AnalyticsEvent] |
The event to track. |
required |
metadata |
Optional[Dict[str, Any]] |
Additional metadata to track with the event. |
None |
Source code in zenml/zen_stores/base_zen_store.py
def track_event(
self,
event: Union[str, AnalyticsEvent],
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Track an analytics event.
Args:
event: The event to track.
metadata: Additional metadata to track with the event.
"""
if self.track_analytics:
# Server information is always tracked, if available.
track_event(event, metadata)
validate_active_config(self, active_project_name_or_id=None, active_stack_id=None, config_name='')
Validate the active configuration.
Call this method to validate the supplied active project and active stack values.
This method is guaranteed to return valid project ID and stack ID values. If the supplied project and stack are not set or are not valid (e.g. they do not exist or are not accessible), the default project and default project stack will be returned in their stead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
active_project_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the active project. |
None |
active_stack_id |
Optional[uuid.UUID] |
The ID of the active stack. |
None |
config_name |
str |
The name of the configuration to validate (used in the displayed logs/messages). |
'' |
Returns:
Type | Description |
---|---|
Tuple[zenml.models.project_models.ProjectResponseModel, zenml.models.stack_models.StackResponseModel] |
A tuple containing the active project and active stack. |
Source code in zenml/zen_stores/base_zen_store.py
def validate_active_config(
self,
active_project_name_or_id: Optional[Union[str, UUID]] = None,
active_stack_id: Optional[UUID] = None,
config_name: str = "",
) -> Tuple[ProjectResponseModel, StackResponseModel]:
"""Validate the active configuration.
Call this method to validate the supplied active project and active
stack values.
This method is guaranteed to return valid project ID and stack ID
values. If the supplied project and stack are not set or are not valid
(e.g. they do not exist or are not accessible), the default project and
default project stack will be returned in their stead.
Args:
active_project_name_or_id: The name or ID of the active project.
active_stack_id: The ID of the active stack.
config_name: The name of the configuration to validate (used in the
displayed logs/messages).
Returns:
A tuple containing the active project and active stack.
"""
active_project: ProjectResponseModel
if active_project_name_or_id:
try:
active_project = self.get_project(active_project_name_or_id)
except KeyError:
active_project = self._get_or_create_default_project()
logger.warning(
f"The current {config_name} active project is no longer "
f"available. Resetting the active project to "
f"'{active_project.name}'."
)
else:
active_project = self._get_or_create_default_project()
logger.info(
f"Setting the {config_name} active project "
f"to '{active_project.name}'."
)
active_stack: StackResponseModel
# Sanitize the active stack
if active_stack_id:
# Ensure that the active stack is still valid
try:
active_stack = self.get_stack(stack_id=active_stack_id)
except KeyError:
logger.warning(
"The current %s active stack is no longer available. "
"Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_project
)
else:
if active_stack.project.id != active_project.id:
logger.warning(
"The current %s active stack is not part of the active "
"project. Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_project
)
elif not active_stack.is_shared and (
not active_stack.user
or (active_stack.user.id != self.get_user().id)
):
logger.warning(
"The current %s active stack is not shared and not "
"owned by the active user. "
"Resetting the active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(
active_project
)
else:
logger.warning(
"Setting the %s active stack to default.",
config_name,
)
active_stack = self._get_or_create_default_stack(active_project)
return active_project, active_stack
migrations
special
Alembic database migration utilities.
alembic
Alembic utilities wrapper.
The Alembic class defined here acts as a wrapper around the Alembic library that automatically configures Alembic to use the ZenML SQL store database connection.
Alembic
Alembic environment and migration API.
This class provides a wrapper around the Alembic library that automatically configures Alembic to use the ZenML SQL store database connection.
Source code in zenml/zen_stores/migrations/alembic.py
class Alembic:
"""Alembic environment and migration API.
This class provides a wrapper around the Alembic library that automatically
configures Alembic to use the ZenML SQL store database connection.
"""
def __init__(
self,
engine: Engine,
metadata: MetaData = SQLModel.metadata,
context: Optional[EnvironmentContext] = None,
**kwargs: Any,
) -> None:
"""Initialize the Alembic wrapper.
Args:
engine: The SQLAlchemy engine to use.
metadata: The SQLAlchemy metadata to use.
context: The Alembic environment context to use. If not set, a new
context is created pointing to the ZenML migrations directory.
**kwargs: Additional keyword arguments to pass to the Alembic
environment context.
"""
self.engine = engine
self.metadata = metadata
self.context_kwargs = kwargs
self.config = Config()
self.config.set_main_option(
"script_location", str(Path(__file__).parent)
)
self.config.set_main_option(
"version_locations", str(Path(__file__).parent / "versions")
)
self.script_directory = ScriptDirectory.from_config(self.config)
if context is None:
self.environment_context = EnvironmentContext(
self.config, self.script_directory
)
else:
self.environment_context = context
def db_is_empty(self) -> bool:
"""Check if the database is empty.
Returns:
True if the database is empty, False otherwise.
"""
# Check the existence of any of the SQLModel tables
return not self.engine.dialect.has_table(
self.engine.connect(), schemas.StackSchema.__tablename__
)
def run_migrations(
self,
fn: Optional[Callable[[_RevIdType, MigrationContext], List[Any]]],
) -> None:
"""Run an online migration function in the current migration context.
Args:
fn: Migration function to run. If not set, the function configured
externally by the Alembic CLI command is used.
"""
fn_context_args: Dict[Any, Any] = {}
if fn is not None:
fn_context_args["fn"] = fn
with self.engine.connect() as connection:
self.environment_context.configure(
connection=connection,
target_metadata=self.metadata,
include_object=include_object,
compare_type=True,
render_as_batch=True,
**fn_context_args,
**self.context_kwargs,
)
with self.environment_context.begin_transaction():
self.environment_context.run_migrations()
def current_revisions(self) -> List[str]:
"""Get the current database revisions.
Returns:
List of head revisions.
"""
current_revisions: List[str] = []
def do_get_current_rev(rev: _RevIdType, context: Any) -> List[Any]:
nonlocal current_revisions
for r in self.script_directory.get_all_current(
rev # type:ignore [arg-type]
):
if r is None:
continue
current_revisions.append(r.revision)
return []
self.run_migrations(do_get_current_rev)
return current_revisions
def stamp(self, revision: str) -> None:
"""Stamp the revision table with the given revision without running any migrations.
Args:
revision: String revision target.
"""
def do_stamp(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._stamp_revs(revision, rev)
self.run_migrations(do_stamp)
def upgrade(self, revision: str = "heads") -> None:
"""Upgrade the database to a later version.
Args:
revision: String revision target.
"""
def do_upgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._upgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_upgrade)
def downgrade(self, revision: str) -> None:
"""Revert the database to a previous version.
Args:
revision: String revision target.
"""
def do_downgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._downgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_downgrade)
__init__(self, engine, metadata=MetaData(), context=None, **kwargs)
special
Initialize the Alembic wrapper.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
engine |
Engine |
The SQLAlchemy engine to use. |
required |
metadata |
MetaData |
The SQLAlchemy metadata to use. |
MetaData() |
context |
Optional[alembic.runtime.environment.EnvironmentContext] |
The Alembic environment context to use. If not set, a new context is created pointing to the ZenML migrations directory. |
None |
**kwargs |
Any |
Additional keyword arguments to pass to the Alembic environment context. |
{} |
Source code in zenml/zen_stores/migrations/alembic.py
def __init__(
self,
engine: Engine,
metadata: MetaData = SQLModel.metadata,
context: Optional[EnvironmentContext] = None,
**kwargs: Any,
) -> None:
"""Initialize the Alembic wrapper.
Args:
engine: The SQLAlchemy engine to use.
metadata: The SQLAlchemy metadata to use.
context: The Alembic environment context to use. If not set, a new
context is created pointing to the ZenML migrations directory.
**kwargs: Additional keyword arguments to pass to the Alembic
environment context.
"""
self.engine = engine
self.metadata = metadata
self.context_kwargs = kwargs
self.config = Config()
self.config.set_main_option(
"script_location", str(Path(__file__).parent)
)
self.config.set_main_option(
"version_locations", str(Path(__file__).parent / "versions")
)
self.script_directory = ScriptDirectory.from_config(self.config)
if context is None:
self.environment_context = EnvironmentContext(
self.config, self.script_directory
)
else:
self.environment_context = context
current_revisions(self)
Get the current database revisions.
Returns:
Type | Description |
---|---|
List[str] |
List of head revisions. |
Source code in zenml/zen_stores/migrations/alembic.py
def current_revisions(self) -> List[str]:
"""Get the current database revisions.
Returns:
List of head revisions.
"""
current_revisions: List[str] = []
def do_get_current_rev(rev: _RevIdType, context: Any) -> List[Any]:
nonlocal current_revisions
for r in self.script_directory.get_all_current(
rev # type:ignore [arg-type]
):
if r is None:
continue
current_revisions.append(r.revision)
return []
self.run_migrations(do_get_current_rev)
return current_revisions
db_is_empty(self)
Check if the database is empty.
Returns:
Type | Description |
---|---|
bool |
True if the database is empty, False otherwise. |
Source code in zenml/zen_stores/migrations/alembic.py
def db_is_empty(self) -> bool:
"""Check if the database is empty.
Returns:
True if the database is empty, False otherwise.
"""
# Check the existence of any of the SQLModel tables
return not self.engine.dialect.has_table(
self.engine.connect(), schemas.StackSchema.__tablename__
)
downgrade(self, revision)
Revert the database to a previous version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
revision |
str |
String revision target. |
required |
Source code in zenml/zen_stores/migrations/alembic.py
def downgrade(self, revision: str) -> None:
"""Revert the database to a previous version.
Args:
revision: String revision target.
"""
def do_downgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._downgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_downgrade)
run_migrations(self, fn)
Run an online migration function in the current migration context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
Optional[Callable[[Union[str, Sequence[str]], alembic.runtime.migration.MigrationContext], List[Any]]] |
Migration function to run. If not set, the function configured externally by the Alembic CLI command is used. |
required |
Source code in zenml/zen_stores/migrations/alembic.py
def run_migrations(
self,
fn: Optional[Callable[[_RevIdType, MigrationContext], List[Any]]],
) -> None:
"""Run an online migration function in the current migration context.
Args:
fn: Migration function to run. If not set, the function configured
externally by the Alembic CLI command is used.
"""
fn_context_args: Dict[Any, Any] = {}
if fn is not None:
fn_context_args["fn"] = fn
with self.engine.connect() as connection:
self.environment_context.configure(
connection=connection,
target_metadata=self.metadata,
include_object=include_object,
compare_type=True,
render_as_batch=True,
**fn_context_args,
**self.context_kwargs,
)
with self.environment_context.begin_transaction():
self.environment_context.run_migrations()
stamp(self, revision)
Stamp the revision table with the given revision without running any migrations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
revision |
str |
String revision target. |
required |
Source code in zenml/zen_stores/migrations/alembic.py
def stamp(self, revision: str) -> None:
"""Stamp the revision table with the given revision without running any migrations.
Args:
revision: String revision target.
"""
def do_stamp(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._stamp_revs(revision, rev)
self.run_migrations(do_stamp)
upgrade(self, revision='heads')
Upgrade the database to a later version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
revision |
str |
String revision target. |
'heads' |
Source code in zenml/zen_stores/migrations/alembic.py
def upgrade(self, revision: str = "heads") -> None:
"""Upgrade the database to a later version.
Args:
revision: String revision target.
"""
def do_upgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._upgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_upgrade)
AlembicVersion (Base)
Alembic version table.
Source code in zenml/zen_stores/migrations/alembic.py
class AlembicVersion(Base): # type: ignore[valid-type,misc]
"""Alembic version table."""
__tablename__ = "alembic_version"
version_num = Column(String, nullable=False, primary_key=True)
include_object(object, name, type_, *args, **kwargs)
Function used to exclude tables from the migration scripts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
Any |
The schema item object to check. |
required |
name |
str |
The name of the object to check. |
required |
type_ |
str |
The type of the object to check. |
required |
*args |
Any |
Additional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
bool |
True if the object should be included, False otherwise. |
Source code in zenml/zen_stores/migrations/alembic.py
def include_object(
object: Any, name: str, type_: str, *args: Any, **kwargs: Any
) -> bool:
"""Function used to exclude tables from the migration scripts.
Args:
object: The schema item object to check.
name: The name of the object to check.
type_: The type of the object to check.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
Returns:
True if the object should be included, False otherwise.
"""
return not (type_ == "table" and name in exclude_tables)
rest_zen_store
REST Zen Store implementation.
RestZenStore (BaseZenStore)
pydantic-model
Store implementation for accessing data from a REST API.
Source code in zenml/zen_stores/rest_zen_store.py
class RestZenStore(BaseZenStore):
"""Store implementation for accessing data from a REST API."""
config: RestZenStoreConfiguration
TYPE: ClassVar[StoreType] = StoreType.REST
CONFIG_TYPE: ClassVar[Type[StoreConfiguration]] = RestZenStoreConfiguration
_api_token: Optional[str] = None
_session: Optional[requests.Session] = None
def _initialize_database(self) -> None:
"""Initialize the database."""
# don't do anything for a REST store
# ====================================
# ZenML Store interface implementation
# ====================================
# --------------------------------
# Initialization and configuration
# --------------------------------
def _initialize(self) -> None:
"""Initialize the REST store."""
client_version = zenml.__version__
server_version = self.get_store_info().version
if not DISABLE_CLIENT_SERVER_MISMATCH_WARNING and (
server_version != client_version
):
logger.warning(
"Your ZenML client version (%s) does not match the server "
"version (%s). This version mismatch might lead to errors or "
"unexpected behavior. \nTo disable this warning message, set "
"the environment variable `%s=True`",
client_version,
server_version,
ENV_ZENML_DISABLE_CLIENT_SERVER_MISMATCH_WARNING,
)
def get_store_info(self) -> ServerModel:
"""Get information about the server.
Returns:
Information about the server.
"""
body = self.get(INFO)
return ServerModel.parse_obj(body)
# ------
# Stacks
# ------
@track(AnalyticsEvent.REGISTERED_STACK)
def create_stack(self, stack: StackRequestModel) -> StackResponseModel:
"""Register a new stack.
Args:
stack: The stack to register.
Returns:
The registered stack.
"""
return self._create_project_scoped_resource(
resource=stack,
route=STACKS,
response_model=StackResponseModel,
)
def get_stack(self, stack_id: UUID) -> StackResponseModel:
"""Get a stack by its unique ID.
Args:
stack_id: The ID of the stack to get.
Returns:
The stack with the given ID.
"""
return self._get_resource(
resource_id=stack_id,
route=STACKS,
response_model=StackResponseModel,
)
def list_stacks(
self, stack_filter_model: StackFilterModel
) -> Page[StackResponseModel]:
"""List all stacks matching the given filter criteria.
Args:
stack_filter_model: All filter parameters including pagination
params.
Returns:
A list of all stacks matching the filter criteria.
"""
return self._list_paginated_resources(
route=STACKS,
response_model=StackResponseModel,
filter_model=stack_filter_model,
)
@track(AnalyticsEvent.UPDATED_STACK)
def update_stack(
self, stack_id: UUID, stack_update: StackUpdateModel
) -> StackResponseModel:
"""Update a stack.
Args:
stack_id: The ID of the stack update.
stack_update: The update request on the stack.
Returns:
The updated stack.
"""
return self._update_resource(
resource_id=stack_id,
resource_update=stack_update,
route=STACKS,
response_model=StackResponseModel,
)
@track(AnalyticsEvent.DELETED_STACK)
def delete_stack(self, stack_id: UUID) -> None:
"""Delete a stack.
Args:
stack_id: The ID of the stack to delete.
"""
self._delete_resource(
resource_id=stack_id,
route=STACKS,
)
# ----------------
# Stack components
# ----------------
@track(AnalyticsEvent.REGISTERED_STACK_COMPONENT)
def create_stack_component(
self,
component: ComponentRequestModel,
) -> ComponentResponseModel:
"""Create a stack component.
Args:
component: The stack component to create.
Returns:
The created stack component.
"""
return self._create_project_scoped_resource(
resource=component,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
def get_stack_component(
self, component_id: UUID
) -> ComponentResponseModel:
"""Get a stack component by ID.
Args:
component_id: The ID of the stack component to get.
Returns:
The stack component.
"""
return self._get_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
def list_stack_components(
self, component_filter_model: ComponentFilterModel
) -> Page[ComponentResponseModel]:
"""List all stack components matching the given filter criteria.
Args:
component_filter_model: All filter parameters including pagination
params.
Returns:
A list of all stack components matching the filter criteria.
"""
return self._list_paginated_resources(
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
filter_model=component_filter_model,
)
@track(AnalyticsEvent.UPDATED_STACK_COMPONENT)
def update_stack_component(
self,
component_id: UUID,
component_update: ComponentUpdateModel,
) -> ComponentResponseModel:
"""Update an existing stack component.
Args:
component_id: The ID of the stack component to update.
component_update: The update to be applied to the stack component.
Returns:
The updated stack component.
"""
return self._update_resource(
resource_id=component_id,
resource_update=component_update,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
@track(AnalyticsEvent.DELETED_STACK_COMPONENT)
def delete_stack_component(self, component_id: UUID) -> None:
"""Delete a stack component.
Args:
component_id: The ID of the stack component to delete.
"""
self._delete_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
)
# -----------------------
# Stack component flavors
# -----------------------
@track(AnalyticsEvent.CREATED_FLAVOR)
def create_flavor(self, flavor: FlavorRequestModel) -> FlavorResponseModel:
"""Creates a new stack component flavor.
Args:
flavor: The stack component flavor to create.
Returns:
The newly created flavor.
"""
return self._create_project_scoped_resource(
resource=flavor,
route=FLAVORS,
response_model=FlavorResponseModel,
)
def get_flavor(self, flavor_id: UUID) -> FlavorResponseModel:
"""Get a stack component flavor by ID.
Args:
flavor_id: The ID of the stack component flavor to get.
Returns:
The stack component flavor.
"""
return self._get_resource(
resource_id=flavor_id,
route=FLAVORS,
response_model=FlavorResponseModel,
)
def list_flavors(
self, flavor_filter_model: FlavorFilterModel
) -> Page[FlavorResponseModel]:
"""List all stack component flavors matching the given filter criteria.
Args:
flavor_filter_model: All filter parameters including pagination
params
Returns:
List of all the stack component flavors matching the given criteria.
"""
return self._list_paginated_resources(
route=FLAVORS,
response_model=FlavorResponseModel,
filter_model=flavor_filter_model,
)
@track(AnalyticsEvent.DELETED_FLAVOR)
def delete_flavor(self, flavor_id: UUID) -> None:
"""Delete a stack component flavor.
Args:
flavor_id: The ID of the stack component flavor to delete.
"""
self._delete_resource(
resource_id=flavor_id,
route=FLAVORS,
)
# -----
# Users
# -----
@track(AnalyticsEvent.CREATED_USER)
def create_user(self, user: UserRequestModel) -> UserResponseModel:
"""Creates a new user.
Args:
user: User to be created.
Returns:
The newly created user.
"""
return self._create_resource(
resource=user,
route=USERS + "?assign_default_role=False",
response_model=UserResponseModel,
)
def get_user(
self,
user_name_or_id: Optional[Union[str, UUID]] = None,
include_private: bool = False,
) -> UserResponseModel:
"""Gets a specific user, when no id is specified the active user is returned.
The `include_private` parameter is ignored here as it is handled
implicitly by the /current-user endpoint that is queried when no
user_name_or_id is set. Raises a KeyError in case a user with that id
does not exist.
Args:
user_name_or_id: The name or ID of the user to get.
include_private: Whether to include private user information
Returns:
The requested user, if it was found.
"""
if user_name_or_id:
return self._get_resource(
resource_id=user_name_or_id,
route=USERS,
response_model=UserResponseModel,
)
else:
body = self.get(CURRENT_USER)
return UserResponseModel.parse_obj(body)
def get_auth_user(
self, user_name_or_id: Union[str, UUID]
) -> "UserAuthModel":
"""Gets the auth model to a specific user.
Args:
user_name_or_id: The name or ID of the user to get.
Raises:
NotImplementedError: This method is only available for the
SQLZenStore.
"""
raise NotImplementedError(
"This method is only designed for use"
" by the server endpoints. It is not designed"
" to be called from the client side."
)
def list_users(
self, user_filter_model: UserFilterModel
) -> Page[UserResponseModel]:
"""List all users.
Args:
user_filter_model: All filter parameters including pagination
params.
Returns:
A list of all users.
"""
return self._list_paginated_resources(
route=USERS,
response_model=UserResponseModel,
filter_model=user_filter_model,
)
@track(AnalyticsEvent.UPDATED_USER)
def update_user(
self, user_id: UUID, user_update: UserUpdateModel
) -> UserResponseModel:
"""Updates an existing user.
Args:
user_id: The id of the user to update.
user_update: The update to be applied to the user.
Returns:
The updated user.
"""
return self._update_resource(
resource_id=user_id,
resource_update=user_update,
route=USERS,
response_model=UserResponseModel,
)
@track(AnalyticsEvent.DELETED_USER)
def delete_user(self, user_name_or_id: Union[str, UUID]) -> None:
"""Deletes a user.
Args:
user_name_or_id: The name or ID of the user to delete.
"""
self._delete_resource(
resource_id=user_name_or_id,
route=USERS,
)
# -----
# Teams
# -----
@track(AnalyticsEvent.CREATED_TEAM)
def create_team(self, team: TeamRequestModel) -> TeamResponseModel:
"""Creates a new team.
Args:
team: The team model to create.
Returns:
The newly created team.
"""
return self._create_resource(
resource=team,
route=TEAMS,
response_model=TeamResponseModel,
)
def get_team(self, team_name_or_id: Union[str, UUID]) -> TeamResponseModel:
"""Gets a specific team.
Args:
team_name_or_id: Name or ID of the team to get.
Returns:
The requested team.
"""
return self._get_resource(
resource_id=team_name_or_id,
route=TEAMS,
response_model=TeamResponseModel,
)
def list_teams(
self, team_filter_model: TeamFilterModel
) -> Page[TeamResponseModel]:
"""List all teams matching the given filter criteria.
Args:
team_filter_model: All filter parameters including pagination
params.
Returns:
A list of all teams matching the filter criteria.
"""
return self._list_paginated_resources(
route=TEAMS,
response_model=TeamResponseModel,
filter_model=team_filter_model,
)
@track(AnalyticsEvent.UPDATED_TEAM)
def update_team(
self, team_id: UUID, team_update: TeamUpdateModel
) -> TeamResponseModel:
"""Update an existing team.
Args:
team_id: The ID of the team to be updated.
team_update: The update to be applied to the team.
Returns:
The updated team.
"""
return self._update_resource(
resource_id=team_id,
resource_update=team_update,
route=TEAMS,
response_model=TeamResponseModel,
)
@track(AnalyticsEvent.DELETED_TEAM)
def delete_team(self, team_name_or_id: Union[str, UUID]) -> None:
"""Deletes a team.
Args:
team_name_or_id: Name or ID of the team to delete.
"""
self._delete_resource(
resource_id=team_name_or_id,
route=TEAMS,
)
# -----
# Roles
# -----
@track(AnalyticsEvent.CREATED_ROLE)
def create_role(self, role: RoleRequestModel) -> RoleResponseModel:
"""Creates a new role.
Args:
role: The role model to create.
Returns:
The newly created role.
"""
return self._create_resource(
resource=role,
route=ROLES,
response_model=RoleResponseModel,
)
def get_role(self, role_name_or_id: Union[str, UUID]) -> RoleResponseModel:
"""Gets a specific role.
Args:
role_name_or_id: Name or ID of the role to get.
Returns:
The requested role.
"""
return self._get_resource(
resource_id=role_name_or_id,
route=ROLES,
response_model=RoleResponseModel,
)
def list_roles(
self, role_filter_model: RoleFilterModel
) -> Page[RoleResponseModel]:
"""List all roles matching the given filter criteria.
Args:
role_filter_model: All filter parameters including pagination
params.
Returns:
A list of all roles matching the filter criteria.
"""
return self._list_paginated_resources(
route=ROLES,
response_model=RoleResponseModel,
filter_model=role_filter_model,
)
@track(AnalyticsEvent.UPDATED_ROLE)
def update_role(
self, role_id: UUID, role_update: RoleUpdateModel
) -> RoleResponseModel:
"""Update an existing role.
Args:
role_id: The ID of the role to be updated.
role_update: The update to be applied to the role.
Returns:
The updated role.
"""
return self._update_resource(
resource_id=role_id,
resource_update=role_update,
route=ROLES,
response_model=RoleResponseModel,
)
@track(AnalyticsEvent.DELETED_ROLE)
def delete_role(self, role_name_or_id: Union[str, UUID]) -> None:
"""Deletes a role.
Args:
role_name_or_id: Name or ID of the role to delete.
"""
self._delete_resource(
resource_id=role_name_or_id,
route=ROLES,
)
# ----------------
# Role assignments
# ----------------
def list_user_role_assignments(
self, user_role_assignment_filter_model: UserRoleAssignmentFilterModel
) -> Page[UserRoleAssignmentResponseModel]:
"""List all roles assignments matching the given filter criteria.
Args:
user_role_assignment_filter_model: All filter parameters including
pagination params.
Returns:
A list of all roles assignments matching the filter criteria.
"""
return self._list_paginated_resources(
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
filter_model=user_role_assignment_filter_model,
)
def get_user_role_assignment(
self, user_role_assignment_id: UUID
) -> UserRoleAssignmentResponseModel:
"""Get an existing role assignment by name or ID.
Args:
user_role_assignment_id: Name or ID of the role assignment to get.
Returns:
The requested project.
"""
return self._get_resource(
resource_id=user_role_assignment_id,
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
)
def delete_user_role_assignment(
self, user_role_assignment_id: UUID
) -> None:
"""Delete a specific role assignment.
Args:
user_role_assignment_id: The ID of the specific role assignment
"""
self._delete_resource(
resource_id=user_role_assignment_id,
route=USER_ROLE_ASSIGNMENTS,
)
def create_user_role_assignment(
self, user_role_assignment: UserRoleAssignmentRequestModel
) -> UserRoleAssignmentResponseModel:
"""Creates a new role assignment.
Args:
user_role_assignment: The role assignment to create.
Returns:
The newly created project.
"""
return self._create_resource(
resource=user_role_assignment,
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
)
# ---------------------
# Team Role assignments
# ---------------------
def create_team_role_assignment(
self, team_role_assignment: TeamRoleAssignmentRequestModel
) -> TeamRoleAssignmentResponseModel:
"""Creates a new team role assignment.
Args:
team_role_assignment: The role assignment model to create.
Returns:
The newly created role assignment.
"""
return self._create_resource(
resource=team_role_assignment,
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
)
def get_team_role_assignment(
self, team_role_assignment_id: UUID
) -> TeamRoleAssignmentResponseModel:
"""Gets a specific role assignment.
Args:
team_role_assignment_id: ID of the role assignment to get.
Returns:
The requested role assignment.
Raises:
KeyError: If no role assignment with the given ID exists.
"""
return self._get_resource(
resource_id=team_role_assignment_id,
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
)
def delete_team_role_assignment(
self, team_role_assignment_id: UUID
) -> None:
"""Delete a specific role assignment.
Args:
team_role_assignment_id: The ID of the specific role assignment
"""
self._delete_resource(
resource_id=team_role_assignment_id,
route=TEAM_ROLE_ASSIGNMENTS,
)
def list_team_role_assignments(
self, team_role_assignment_filter_model: TeamRoleAssignmentFilterModel
) -> Page[TeamRoleAssignmentResponseModel]:
"""List all roles assignments matching the given filter criteria.
Args:
team_role_assignment_filter_model: All filter parameters including
pagination params.
Returns:
A list of all roles assignments matching the filter criteria.
"""
return self._list_paginated_resources(
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
filter_model=team_role_assignment_filter_model,
)
# --------
# Projects
# --------
@track(AnalyticsEvent.CREATED_PROJECT)
def create_project(
self, project: ProjectRequestModel
) -> ProjectResponseModel:
"""Creates a new project.
Args:
project: The project to create.
Returns:
The newly created project.
"""
return self._create_resource(
resource=project,
route=PROJECTS,
response_model=ProjectResponseModel,
)
def get_project(
self, project_name_or_id: Union[UUID, str]
) -> ProjectResponseModel:
"""Get an existing project by name or ID.
Args:
project_name_or_id: Name or ID of the project to get.
Returns:
The requested project.
"""
return self._get_resource(
resource_id=project_name_or_id,
route=PROJECTS,
response_model=ProjectResponseModel,
)
def list_projects(
self, project_filter_model: ProjectFilterModel
) -> Page[ProjectResponseModel]:
"""List all project matching the given filter criteria.
Args:
project_filter_model: All filter parameters including pagination
params.
Returns:
A list of all project matching the filter criteria.
"""
return self._list_paginated_resources(
route=PROJECTS,
response_model=ProjectResponseModel,
filter_model=project_filter_model,
)
@track(AnalyticsEvent.UPDATED_PROJECT)
def update_project(
self, project_id: UUID, project_update: ProjectUpdateModel
) -> ProjectResponseModel:
"""Update an existing project.
Args:
project_id: The ID of the project to be updated.
project_update: The update to be applied to the project.
Returns:
The updated project.
"""
return self._update_resource(
resource_id=project_id,
resource_update=project_update,
route=PROJECTS,
response_model=ProjectResponseModel,
)
@track(AnalyticsEvent.DELETED_PROJECT)
def delete_project(self, project_name_or_id: Union[str, UUID]) -> None:
"""Deletes a project.
Args:
project_name_or_id: Name or ID of the project to delete.
"""
self._delete_resource(
resource_id=project_name_or_id,
route=PROJECTS,
)
# ---------
# Pipelines
# ---------
@track(AnalyticsEvent.CREATE_PIPELINE)
def create_pipeline(
self, pipeline: PipelineRequestModel
) -> PipelineResponseModel:
"""Creates a new pipeline in a project.
Args:
pipeline: The pipeline to create.
Returns:
The newly created pipeline.
"""
return self._create_project_scoped_resource(
resource=pipeline,
route=PIPELINES,
response_model=PipelineResponseModel,
)
def get_pipeline(self, pipeline_id: UUID) -> PipelineResponseModel:
"""Get a pipeline with a given ID.
Args:
pipeline_id: ID of the pipeline.
Returns:
The pipeline.
"""
return self._get_resource(
resource_id=pipeline_id,
route=PIPELINES,
response_model=PipelineResponseModel,
)
def list_pipelines(
self, pipeline_filter_model: PipelineFilterModel
) -> Page[PipelineResponseModel]:
"""List all pipelines matching the given filter criteria.
Args:
pipeline_filter_model: All filter parameters including pagination
params.
Returns:
A list of all pipelines matching the filter criteria.
"""
return self._list_paginated_resources(
route=PIPELINES,
response_model=PipelineResponseModel,
filter_model=pipeline_filter_model,
)
@track(AnalyticsEvent.UPDATE_PIPELINE)
def update_pipeline(
self, pipeline_id: UUID, pipeline_update: PipelineUpdateModel
) -> PipelineResponseModel:
"""Updates a pipeline.
Args:
pipeline_id: The ID of the pipeline to be updated.
pipeline_update: The update to be applied.
Returns:
The updated pipeline.
"""
return self._update_resource(
resource_id=pipeline_id,
resource_update=pipeline_update,
route=PIPELINES,
response_model=PipelineResponseModel,
)
@track(AnalyticsEvent.DELETE_PIPELINE)
def delete_pipeline(self, pipeline_id: UUID) -> None:
"""Deletes a pipeline.
Args:
pipeline_id: The ID of the pipeline to delete.
"""
self._delete_resource(
resource_id=pipeline_id,
route=PIPELINES,
)
# ---------
# Schedules
# ---------
def create_schedule(
self, schedule: ScheduleRequestModel
) -> ScheduleResponseModel:
"""Creates a new schedule.
Args:
schedule: The schedule to create.
Returns:
The newly created schedule.
"""
return self._create_project_scoped_resource(
resource=schedule,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
def get_schedule(self, schedule_id: UUID) -> ScheduleResponseModel:
"""Get a schedule with a given ID.
Args:
schedule_id: ID of the schedule.
Returns:
The schedule.
"""
return self._get_resource(
resource_id=schedule_id,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
def list_schedules(
self, schedule_filter_model: ScheduleFilterModel
) -> Page[ScheduleResponseModel]:
"""List all schedules in the project.
Args:
schedule_filter_model: All filter parameters including pagination
params
Returns:
A list of schedules.
"""
return self._list_paginated_resources(
route=SCHEDULES,
response_model=ScheduleResponseModel,
filter_model=schedule_filter_model,
)
def update_schedule(
self,
schedule_id: UUID,
schedule_update: ScheduleUpdateModel,
) -> ScheduleResponseModel:
"""Updates a schedule.
Args:
schedule_id: The ID of the schedule to be updated.
schedule_update: The update to be applied.
Returns:
The updated schedule.
"""
return self._update_resource(
resource_id=schedule_id,
resource_update=schedule_update,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
def delete_schedule(self, schedule_id: UUID) -> None:
"""Deletes a schedule.
Args:
schedule_id: The ID of the schedule to delete.
"""
self._delete_resource(
resource_id=schedule_id,
route=SCHEDULES,
)
# --------------
# Pipeline runs
# --------------
def create_run(
self, pipeline_run: PipelineRunRequestModel
) -> PipelineRunResponseModel:
"""Creates a pipeline run.
Args:
pipeline_run: The pipeline run to create.
Returns:
The created pipeline run.
"""
return self._create_project_scoped_resource(
resource=pipeline_run,
response_model=PipelineRunResponseModel,
route=RUNS,
)
def get_run(
self, run_name_or_id: Union[UUID, str]
) -> PipelineRunResponseModel:
"""Gets a pipeline run.
Args:
run_name_or_id: The name or ID of the pipeline run to get.
Returns:
The pipeline run.
"""
return self._get_resource(
resource_id=run_name_or_id,
route=RUNS,
response_model=PipelineRunResponseModel,
)
def get_or_create_run(
self, pipeline_run: PipelineRunRequestModel
) -> PipelineRunResponseModel:
"""Gets or creates a pipeline run.
If a run with the same ID or name already exists, it is returned.
Otherwise, a new run is created.
Args:
pipeline_run: The pipeline run to get or create.
Returns:
The pipeline run.
"""
return self._create_project_scoped_resource(
resource=pipeline_run,
route=RUNS,
response_model=PipelineRunResponseModel,
params={"get_if_exists": True},
)
def list_runs(
self, runs_filter_model: PipelineRunFilterModel
) -> Page[PipelineRunResponseModel]:
"""List all pipeline runs matching the given filter criteria.
Args:
runs_filter_model: All filter parameters including pagination
params.
Returns:
A list of all pipeline runs matching the filter criteria.
"""
return self._list_paginated_resources(
route=RUNS,
response_model=PipelineRunResponseModel,
filter_model=runs_filter_model,
)
def update_run(
self, run_id: UUID, run_update: PipelineRunUpdateModel
) -> PipelineRunResponseModel:
"""Updates a pipeline run.
Args:
run_id: The ID of the pipeline run to update.
run_update: The update to be applied to the pipeline run.
Returns:
The updated pipeline run.
"""
return self._update_resource(
resource_id=run_id,
resource_update=run_update,
response_model=PipelineRunResponseModel,
route=RUNS,
)
def delete_run(self, run_id: UUID) -> None:
"""Deletes a pipeline run.
Args:
run_id: The ID of the pipeline run to delete.
"""
self._delete_resource(
resource_id=run_id,
route=RUNS,
)
# ------------------
# Pipeline run steps
# ------------------
def create_run_step(
self, step_run: StepRunRequestModel
) -> StepRunResponseModel:
"""Creates a step run.
Args:
step_run: The step run to create.
Returns:
The created step run.
"""
return self._create_resource(
resource=step_run,
response_model=StepRunResponseModel,
route=STEPS,
)
def get_run_step(self, step_run_id: UUID) -> StepRunResponseModel:
"""Get a step run by ID.
Args:
step_run_id: The ID of the step run to get.
Returns:
The step run.
"""
return self._get_resource(
resource_id=step_run_id,
route=STEPS,
response_model=StepRunResponseModel,
)
def list_run_steps(
self, step_run_filter_model: StepRunFilterModel
) -> Page[StepRunResponseModel]:
"""List all step runs matching the given filter criteria.
Args:
step_run_filter_model: All filter parameters including pagination
params.
Returns:
A list of all step runs matching the filter criteria.
"""
return self._list_paginated_resources(
route=STEPS,
response_model=StepRunResponseModel,
filter_model=step_run_filter_model,
)
def update_run_step(
self,
step_run_id: UUID,
step_run_update: StepRunUpdateModel,
) -> StepRunResponseModel:
"""Updates a step run.
Args:
step_run_id: The ID of the step to update.
step_run_update: The update to be applied to the step.
Returns:
The updated step run.
"""
return self._update_resource(
resource_id=step_run_id,
resource_update=step_run_update,
response_model=StepRunResponseModel,
route=STEPS,
)
# ---------
# Artifacts
# ---------
def create_artifact(
self, artifact: ArtifactRequestModel
) -> ArtifactResponseModel:
"""Creates an artifact.
Args:
artifact: The artifact to create.
Returns:
The created artifact.
"""
return self._create_resource(
resource=artifact,
response_model=ArtifactResponseModel,
route=ARTIFACTS,
)
def get_artifact(self, artifact_id: UUID) -> ArtifactResponseModel:
"""Gets an artifact.
Args:
artifact_id: The ID of the artifact to get.
Returns:
The artifact.
"""
return self._get_resource(
resource_id=artifact_id,
route=ARTIFACTS,
response_model=ArtifactResponseModel,
)
def list_artifacts(
self, artifact_filter_model: ArtifactFilterModel
) -> Page[ArtifactResponseModel]:
"""List all artifacts matching the given filter criteria.
Args:
artifact_filter_model: All filter parameters including pagination
params.
Returns:
A list of all artifacts matching the filter criteria.
"""
return self._list_paginated_resources(
route=ARTIFACTS,
response_model=ArtifactResponseModel,
filter_model=artifact_filter_model,
)
def delete_artifact(self, artifact_id: UUID) -> None:
"""Deletes an artifact.
Args:
artifact_id: The ID of the artifact to delete.
"""
self._delete_resource(resource_id=artifact_id, route=ARTIFACTS)
# =======================
# Internal helper methods
# =======================
def _get_auth_token(self) -> str:
"""Get the authentication token for the REST store.
Returns:
The authentication token.
Raises:
ValueError: if the response from the server isn't in the right
format.
"""
if self._api_token is None:
# Check if the API token is already stored in the config
if self.config.api_token:
self._api_token = self.config.api_token
# Check if the username and password are provided in the config
elif (
self.config.username is not None
and self.config.password is not None
):
response = self._handle_response(
requests.post(
self.url + API + VERSION_1 + LOGIN,
data={
"username": self.config.username,
"password": self.config.password,
},
verify=self.config.verify_ssl,
timeout=self.config.http_timeout,
)
)
if (
not isinstance(response, dict)
or "access_token" not in response
):
raise ValueError(
f"Bad API Response. Expected access token dict, got "
f"{type(response)}"
)
self._api_token = response["access_token"]
self.config.api_token = self._api_token
else:
raise ValueError(
"No API token or username/password provided. Please "
"provide either a token or a username and password in "
"the ZenStore config."
)
return self._api_token
@property
def session(self) -> requests.Session:
"""Authenticate to the ZenML server.
Returns:
A requests session with the authentication token.
"""
if self._session is None:
if self.config.verify_ssl is False:
urllib3.disable_warnings(
urllib3.exceptions.InsecureRequestWarning
)
self._session = requests.Session()
self._session.verify = self.config.verify_ssl
token = self._get_auth_token()
self._session.headers.update({"Authorization": "Bearer " + token})
logger.debug("Authenticated to ZenML server.")
return self._session
@staticmethod
def _handle_response(response: requests.Response) -> Json:
"""Handle API response, translating http status codes to Exception.
Args:
response: The response to handle.
Returns:
The parsed response.
Raises:
DoesNotExistException: If the response indicates that the
requested entity does not exist.
EntityExistsError: If the response indicates that the requested
entity already exists.
AuthorizationException: If the response indicates that the request
is not authorized.
IllegalOperationError: If the response indicates that the requested
operation is forbidden.
KeyError: If the response indicates that the requested entity
does not exist.
RuntimeError: If the response indicates that the requested entity
does not exist.
StackComponentExistsError: If the response indicates that the
requested entity already exists.
StackExistsError: If the response indicates that the requested
entity already exists.
ValueError: If the response indicates that the requested entity
does not exist.
"""
if 200 <= response.status_code < 300:
try:
payload: Json = response.json()
return payload
except requests.exceptions.JSONDecodeError:
raise ValueError(
"Bad response from API. Expected json, got\n"
f"{response.text}"
)
elif response.status_code == 401:
raise AuthorizationException(
f"{response.status_code} Client Error: Unauthorized request to "
f"URL {response.url}: {response.json().get('detail')}"
)
elif response.status_code == 403:
msg = response.json().get("detail", response.text)
if isinstance(msg, list):
msg = msg[-1]
raise IllegalOperationError(msg)
elif response.status_code == 404:
if "KeyError" in response.text:
raise KeyError(
response.json().get("detail", (response.text,))[1]
)
elif "DoesNotExistException" in response.text:
message = ": ".join(
response.json().get("detail", (response.text,))
)
raise DoesNotExistException(message)
raise DoesNotExistException("Endpoint does not exist.")
elif response.status_code == 409:
if "StackComponentExistsError" in response.text:
raise StackComponentExistsError(
message=": ".join(
response.json().get("detail", (response.text,))
)
)
elif "StackExistsError" in response.text:
raise StackExistsError(
message=": ".join(
response.json().get("detail", (response.text,))
)
)
elif "EntityExistsError" in response.text:
raise EntityExistsError(
message=": ".join(
response.json().get("detail", (response.text,))
)
)
else:
raise ValueError(
": ".join(response.json().get("detail", (response.text,)))
)
elif response.status_code == 422:
response_details = response.json().get("detail", (response.text,))
if isinstance(response_details[0], str):
response_msg = ": ".join(response_details)
else:
# This is an "Unprocessable Entity" error, which has a special
# structure in the response.
response_msg = response.text
raise RuntimeError(response_msg)
elif response.status_code == 500:
raise RuntimeError(response.text)
else:
raise RuntimeError(
"Error retrieving from API. Got response "
f"{response.status_code} with body:\n{response.text}"
)
def _request(
self,
method: str,
url: str,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a request to the REST API.
Args:
method: The HTTP method to use.
url: The URL to request.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The parsed response.
"""
params = {k: str(v) for k, v in params.items()} if params else {}
try:
return self._handle_response(
self.session.request(
method,
url,
params=params,
verify=self.config.verify_ssl,
timeout=self.config.http_timeout,
**kwargs,
)
)
except AuthorizationException:
# The authentication token could have expired; refresh it and try
# again
self._session = None
return self._handle_response(
self.session.request(
method,
url,
params=params,
verify=self.config.verify_ssl,
timeout=self.config.http_timeout,
**kwargs,
)
)
def get(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a GET request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending GET request to {path}...")
return self._request(
"GET", self.url + API + VERSION_1 + path, params=params, **kwargs
)
def delete(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a DELETE request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending DELETE request to {path}...")
return self._request(
"DELETE",
self.url + API + VERSION_1 + path,
params=params,
**kwargs,
)
def post(
self,
path: str,
body: BaseModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a POST request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending POST request to {path}...")
return self._request(
"POST",
self.url + API + VERSION_1 + path,
data=body.json(),
params=params,
**kwargs,
)
def put(
self,
path: str,
body: BaseModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a PUT request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending PUT request to {path}...")
return self._request(
"PUT",
self.url + API + VERSION_1 + path,
data=body.json(exclude_unset=True),
params=params,
**kwargs,
)
def _create_resource(
self,
resource: BaseRequestModel,
response_model: Type[AnyResponseModel],
route: str,
params: Optional[Dict[str, Any]] = None,
) -> AnyResponseModel:
"""Create a new resource.
Args:
resource: The resource to create.
route: The resource REST API route to use.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
params: Optional query parameters to pass to the endpoint.
Returns:
The created resource.
"""
response_body = self.post(f"{route}", body=resource, params=params)
return response_model.parse_obj(response_body)
def _create_project_scoped_resource(
self,
resource: ProjectScopedRequestModel,
response_model: Type[AnyProjestResponseModel],
route: str,
params: Optional[Dict[str, Any]] = None,
) -> AnyProjestResponseModel:
"""Create a new project scoped resource.
Args:
resource: The resource to create.
route: The resource REST API route to use.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
params: Optional query parameters to pass to the endpoint.
Returns:
The created resource.
"""
return self._create_resource(
resource=resource,
response_model=response_model,
route=f"{PROJECTS}/{str(resource.project)}{route}",
params=params,
)
def _get_resource(
self,
resource_id: Union[str, UUID],
route: str,
response_model: Type[AnyResponseModel],
) -> AnyResponseModel:
"""Retrieve a single resource.
Args:
resource_id: The ID of the resource to retrieve.
route: The resource REST API route to use.
response_model: Model to use to serialize the response body.
Returns:
The retrieved resource.
"""
body = self.get(f"{route}/{str(resource_id)}")
return response_model.parse_obj(body)
def _list_paginated_resources(
self,
route: str,
response_model: Type[AnyResponseModel],
filter_model: BaseFilterModel,
) -> Page[AnyResponseModel]:
"""Retrieve a list of resources filtered by some criteria.
Args:
route: The resource REST API route to use.
response_model: Model to use to serialize the response body.
filter_model: The filter model to use for the list query.
Returns:
List of retrieved resources matching the filter criteria.
Raises:
ValueError: If the value returned by the server is not a list.
"""
# leave out filter params that are not supplied
body = self.get(
f"{route}", params=filter_model.dict(exclude_none=True)
)
if not isinstance(body, dict):
raise ValueError(
f"Bad API Response. Expected list, got {type(body)}"
)
# The initial page of items will be of type BaseResponseModel
page_of_items: Page[AnyResponseModel] = Page.parse_obj(body)
# So these items will be parsed into their correct types like here
page_of_items.items = [
response_model.parse_obj(generic_item)
for generic_item in page_of_items.items
]
return page_of_items
def _list_resources(
self,
route: str,
response_model: Type[AnyResponseModel],
**filters: Any,
) -> List[AnyResponseModel]:
"""Retrieve a list of resources filtered by some criteria.
Args:
route: The resource REST API route to use.
response_model: Model to use to serialize the response body.
filters: Filter parameters to use in the query.
Returns:
List of retrieved resources matching the filter criteria.
Raises:
ValueError: If the value returned by the server is not a list.
"""
# leave out filter params that are not supplied
params = dict(filter(lambda x: x[1] is not None, filters.items()))
body = self.get(f"{route}", params=params)
if not isinstance(body, list):
raise ValueError(
f"Bad API Response. Expected list, got {type(body)}"
)
return [response_model.parse_obj(entry) for entry in body]
def _update_resource(
self,
resource_id: UUID,
resource_update: BaseModel,
response_model: Type[AnyResponseModel],
route: str,
) -> AnyResponseModel:
"""Update an existing resource.
Args:
resource_id: The id of the resource to update.
resource_update: The resource update.
route: The resource REST API route to use.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
Returns:
The updated resource.
"""
response_body = self.put(
f"{route}/{str(resource_id)}", body=resource_update
)
return response_model.parse_obj(response_body)
def _delete_resource(
self, resource_id: Union[str, UUID], route: str
) -> None:
"""Delete a resource.
Args:
resource_id: The ID of the resource to delete.
route: The resource REST API route to use.
"""
self.delete(f"{route}/{str(resource_id)}")
session: Session
property
readonly
Authenticate to the ZenML server.
Returns:
Type | Description |
---|---|
Session |
A requests session with the authentication token. |
CONFIG_TYPE (StoreConfiguration)
pydantic-model
REST ZenML store configuration.
Attributes:
Name | Type | Description |
---|---|---|
username |
Optional[str] |
The username to use to connect to the Zen server. |
password |
Optional[str] |
The password to use to connect to the Zen server. |
verify_ssl |
Union[bool, str] |
Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use or the CA bundle value itself. |
http_timeout |
int |
The timeout to use for all requests. |
Source code in zenml/zen_stores/rest_zen_store.py
class RestZenStoreConfiguration(StoreConfiguration):
"""REST ZenML store configuration.
Attributes:
username: The username to use to connect to the Zen server.
password: The password to use to connect to the Zen server.
verify_ssl: Either a boolean, in which case it controls whether we
verify the server's TLS certificate, or a string, in which case it
must be a path to a CA bundle to use or the CA bundle value itself.
http_timeout: The timeout to use for all requests.
"""
type: StoreType = StoreType.REST
username: Optional[str] = None
password: Optional[str] = None
api_token: Optional[str] = None
verify_ssl: Union[bool, str] = True
http_timeout: int = DEFAULT_HTTP_TIMEOUT
@root_validator
def validate_credentials(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validates the credentials provided in the values dictionary.
Args:
values: A dictionary containing the values to be validated.
Raises:
ValueError: If neither api_token nor username is set.
Returns:
The values dictionary.
"""
# Check if the values dictionary contains either an api_token or a
# username as non-empty strings.
if values.get("api_token") or values.get("username"):
return values
else:
raise ValueError(
"Neither api_token nor username is set in the store config."
)
@validator("url")
def validate_url(cls, url: str) -> str:
"""Validates that the URL is a well-formed REST store URL.
Args:
url: The URL to be validated.
Returns:
The validated URL without trailing slashes.
Raises:
ValueError: If the URL is not a well-formed REST store URL.
"""
url = url.rstrip("/")
scheme = re.search("^([a-z0-9]+://)", url)
if scheme is None or scheme.group() not in ("https://", "http://"):
raise ValueError(
"Invalid URL for REST store: {url}. Should be in the form "
"https://hostname[:port] or http://hostname[:port]."
)
# When running inside a container, if the URL uses localhost, the
# target service will not be available. We try to replace localhost
# with one of the special Docker or K3D internal hostnames.
url = replace_localhost_with_internal_hostname(url)
return url
@validator("verify_ssl")
def validate_verify_ssl(
cls, verify_ssl: Union[bool, str]
) -> Union[bool, str]:
"""Validates that the verify_ssl either points to a file or is a bool.
Args:
verify_ssl: The verify_ssl value to be validated.
Returns:
The validated verify_ssl value.
"""
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
if isinstance(verify_ssl, bool) or verify_ssl.startswith(
str(secret_folder)
):
return verify_ssl
if os.path.isfile(verify_ssl):
with open(verify_ssl, "r") as f:
verify_ssl = f.read()
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, "ca_bundle.pem")
with open(file_path, "w") as f:
f.write(verify_ssl)
file_path.chmod(0o600)
verify_ssl = str(file_path)
return verify_ssl
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
"""Check if a URL scheme is supported by this store.
Args:
url: The URL to check.
Returns:
True if the URL scheme is supported, False otherwise.
"""
return urlparse(url).scheme in ("http", "https")
def expand_certificates(self) -> None:
"""Expands the certificates in the verify_ssl field."""
# Load the certificate values back into the configuration
if isinstance(self.verify_ssl, str) and os.path.isfile(
self.verify_ssl
):
with open(self.verify_ssl, "r") as f:
self.verify_ssl = f.read()
@classmethod
def copy_configuration(
cls,
config: "StoreConfiguration",
config_path: str,
load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
"""Create a copy of the store config using a different path.
This method is used to create a copy of the store configuration that can
be loaded using a different configuration path or in the context of a
new environment, such as a container image.
The configuration files accompanying the store configuration are also
copied to the new configuration path (e.g. certificates etc.).
Args:
config: The store configuration to copy.
config_path: new path where the configuration copy will be loaded
from.
load_config_path: absolute path that will be used to load the copied
configuration. This can be set to a value different from
`config_path` if the configuration copy will be loaded from
a different environment, e.g. when the configuration is copied
to a container image and loaded using a different absolute path.
This will be reflected in the paths and URLs encoded in the
copied configuration.
Returns:
A new store configuration object that reflects the new configuration
path.
"""
assert isinstance(config, RestZenStoreConfiguration)
assert config.api_token is not None
config = config.copy(exclude={"username", "password"}, deep=True)
# Load the certificate values back into the configuration
config.expand_certificates()
return config
class Config:
"""Pydantic configuration class."""
# Don't validate attributes when assigning them. This is necessary
# because the `verify_ssl` attribute can be expanded to the contents
# of the certificate file.
validate_assignment = False
# Forbid extra attributes set in the class.
extra = "forbid"
Config
Pydantic configuration class.
Source code in zenml/zen_stores/rest_zen_store.py
class Config:
"""Pydantic configuration class."""
# Don't validate attributes when assigning them. This is necessary
# because the `verify_ssl` attribute can be expanded to the contents
# of the certificate file.
validate_assignment = False
# Forbid extra attributes set in the class.
extra = "forbid"
copy_configuration(config, config_path, load_config_path=None)
classmethod
Create a copy of the store config using a different path.
This method is used to create a copy of the store configuration that can be loaded using a different configuration path or in the context of a new environment, such as a container image.
The configuration files accompanying the store configuration are also copied to the new configuration path (e.g. certificates etc.).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
StoreConfiguration |
The store configuration to copy. |
required |
config_path |
str |
new path where the configuration copy will be loaded from. |
required |
load_config_path |
Optional[pathlib.PurePath] |
absolute path that will be used to load the copied
configuration. This can be set to a value different from
|
None |
Returns:
Type | Description |
---|---|
StoreConfiguration |
A new store configuration object that reflects the new configuration path. |
Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def copy_configuration(
cls,
config: "StoreConfiguration",
config_path: str,
load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
"""Create a copy of the store config using a different path.
This method is used to create a copy of the store configuration that can
be loaded using a different configuration path or in the context of a
new environment, such as a container image.
The configuration files accompanying the store configuration are also
copied to the new configuration path (e.g. certificates etc.).
Args:
config: The store configuration to copy.
config_path: new path where the configuration copy will be loaded
from.
load_config_path: absolute path that will be used to load the copied
configuration. This can be set to a value different from
`config_path` if the configuration copy will be loaded from
a different environment, e.g. when the configuration is copied
to a container image and loaded using a different absolute path.
This will be reflected in the paths and URLs encoded in the
copied configuration.
Returns:
A new store configuration object that reflects the new configuration
path.
"""
assert isinstance(config, RestZenStoreConfiguration)
assert config.api_token is not None
config = config.copy(exclude={"username", "password"}, deep=True)
# Load the certificate values back into the configuration
config.expand_certificates()
return config
expand_certificates(self)
Expands the certificates in the verify_ssl field.
Source code in zenml/zen_stores/rest_zen_store.py
def expand_certificates(self) -> None:
"""Expands the certificates in the verify_ssl field."""
# Load the certificate values back into the configuration
if isinstance(self.verify_ssl, str) and os.path.isfile(
self.verify_ssl
):
with open(self.verify_ssl, "r") as f:
self.verify_ssl = f.read()
supports_url_scheme(url)
classmethod
Check if a URL scheme is supported by this store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the URL scheme is supported, False otherwise. |
Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
"""Check if a URL scheme is supported by this store.
Args:
url: The URL to check.
Returns:
True if the URL scheme is supported, False otherwise.
"""
return urlparse(url).scheme in ("http", "https")
validate_credentials(values)
classmethod
Validates the credentials provided in the values dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
A dictionary containing the values to be validated. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If neither api_token nor username is set. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The values dictionary. |
Source code in zenml/zen_stores/rest_zen_store.py
@root_validator
def validate_credentials(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validates the credentials provided in the values dictionary.
Args:
values: A dictionary containing the values to be validated.
Raises:
ValueError: If neither api_token nor username is set.
Returns:
The values dictionary.
"""
# Check if the values dictionary contains either an api_token or a
# username as non-empty strings.
if values.get("api_token") or values.get("username"):
return values
else:
raise ValueError(
"Neither api_token nor username is set in the store config."
)
validate_url(url)
classmethod
Validates that the URL is a well-formed REST store URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to be validated. |
required |
Returns:
Type | Description |
---|---|
str |
The validated URL without trailing slashes. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the URL is not a well-formed REST store URL. |
Source code in zenml/zen_stores/rest_zen_store.py
@validator("url")
def validate_url(cls, url: str) -> str:
"""Validates that the URL is a well-formed REST store URL.
Args:
url: The URL to be validated.
Returns:
The validated URL without trailing slashes.
Raises:
ValueError: If the URL is not a well-formed REST store URL.
"""
url = url.rstrip("/")
scheme = re.search("^([a-z0-9]+://)", url)
if scheme is None or scheme.group() not in ("https://", "http://"):
raise ValueError(
"Invalid URL for REST store: {url}. Should be in the form "
"https://hostname[:port] or http://hostname[:port]."
)
# When running inside a container, if the URL uses localhost, the
# target service will not be available. We try to replace localhost
# with one of the special Docker or K3D internal hostnames.
url = replace_localhost_with_internal_hostname(url)
return url
validate_verify_ssl(verify_ssl)
classmethod
Validates that the verify_ssl either points to a file or is a bool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
verify_ssl |
Union[bool, str] |
The verify_ssl value to be validated. |
required |
Returns:
Type | Description |
---|---|
Union[bool, str] |
The validated verify_ssl value. |
Source code in zenml/zen_stores/rest_zen_store.py
@validator("verify_ssl")
def validate_verify_ssl(
cls, verify_ssl: Union[bool, str]
) -> Union[bool, str]:
"""Validates that the verify_ssl either points to a file or is a bool.
Args:
verify_ssl: The verify_ssl value to be validated.
Returns:
The validated verify_ssl value.
"""
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
if isinstance(verify_ssl, bool) or verify_ssl.startswith(
str(secret_folder)
):
return verify_ssl
if os.path.isfile(verify_ssl):
with open(verify_ssl, "r") as f:
verify_ssl = f.read()
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, "ca_bundle.pem")
with open(file_path, "w") as f:
f.write(verify_ssl)
file_path.chmod(0o600)
verify_ssl = str(file_path)
return verify_ssl
create_artifact(self, artifact)
Creates an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact |
ArtifactRequestModel |
The artifact to create. |
required |
Returns:
Type | Description |
---|---|
ArtifactResponseModel |
The created artifact. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_artifact(
self, artifact: ArtifactRequestModel
) -> ArtifactResponseModel:
"""Creates an artifact.
Args:
artifact: The artifact to create.
Returns:
The created artifact.
"""
return self._create_resource(
resource=artifact,
response_model=ArtifactResponseModel,
route=ARTIFACTS,
)
create_flavor(*args, **kwargs)
Creates a new stack component flavor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor |
The stack component flavor to create. |
required |
Returns:
Type | Description |
---|---|
Any |
The newly created flavor. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_pipeline(*args, **kwargs)
Creates a new pipeline in a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline |
The pipeline to create. |
required |
Returns:
Type | Description |
---|---|
Any |
The newly created pipeline. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_project(*args, **kwargs)
Creates a new project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project |
The project to create. |
required |
Returns:
Type | Description |
---|---|
Any |
The newly created project. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_role(*args, **kwargs)
Creates a new role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role |
The role model to create. |
required |
Returns:
Type | Description |
---|---|
Any |
The newly created role. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_run(self, pipeline_run)
Creates a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRunRequestModel |
The pipeline run to create. |
required |
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The created pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_run(
self, pipeline_run: PipelineRunRequestModel
) -> PipelineRunResponseModel:
"""Creates a pipeline run.
Args:
pipeline_run: The pipeline run to create.
Returns:
The created pipeline run.
"""
return self._create_project_scoped_resource(
resource=pipeline_run,
response_model=PipelineRunResponseModel,
route=RUNS,
)
create_run_step(self, step_run)
Creates a step run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run |
StepRunRequestModel |
The step run to create. |
required |
Returns:
Type | Description |
---|---|
StepRunResponseModel |
The created step run. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_run_step(
self, step_run: StepRunRequestModel
) -> StepRunResponseModel:
"""Creates a step run.
Args:
step_run: The step run to create.
Returns:
The created step run.
"""
return self._create_resource(
resource=step_run,
response_model=StepRunResponseModel,
route=STEPS,
)
create_schedule(self, schedule)
Creates a new schedule.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule |
ScheduleRequestModel |
The schedule to create. |
required |
Returns:
Type | Description |
---|---|
ScheduleResponseModel |
The newly created schedule. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_schedule(
self, schedule: ScheduleRequestModel
) -> ScheduleResponseModel:
"""Creates a new schedule.
Args:
schedule: The schedule to create.
Returns:
The newly created schedule.
"""
return self._create_project_scoped_resource(
resource=schedule,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
create_stack(*args, **kwargs)
Register a new stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack |
The stack to register. |
required |
Returns:
Type | Description |
---|---|
Any |
The registered stack. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_stack_component(*args, **kwargs)
Create a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component |
The stack component to create. |
required |
Returns:
Type | Description |
---|---|
Any |
The created stack component. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_team(*args, **kwargs)
Creates a new team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team |
The team model to create. |
required |
Returns:
Type | Description |
---|---|
Any |
The newly created team. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_team_role_assignment(self, team_role_assignment)
Creates a new team role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_role_assignment |
TeamRoleAssignmentRequestModel |
The role assignment model to create. |
required |
Returns:
Type | Description |
---|---|
TeamRoleAssignmentResponseModel |
The newly created role assignment. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_team_role_assignment(
self, team_role_assignment: TeamRoleAssignmentRequestModel
) -> TeamRoleAssignmentResponseModel:
"""Creates a new team role assignment.
Args:
team_role_assignment: The role assignment model to create.
Returns:
The newly created role assignment.
"""
return self._create_resource(
resource=team_role_assignment,
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
)
create_user(*args, **kwargs)
Creates a new user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user |
User to be created. |
required |
Returns:
Type | Description |
---|---|
Any |
The newly created user. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_user_role_assignment(self, user_role_assignment)
Creates a new role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_role_assignment |
UserRoleAssignmentRequestModel |
The role assignment to create. |
required |
Returns:
Type | Description |
---|---|
UserRoleAssignmentResponseModel |
The newly created project. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_user_role_assignment(
self, user_role_assignment: UserRoleAssignmentRequestModel
) -> UserRoleAssignmentResponseModel:
"""Creates a new role assignment.
Args:
user_role_assignment: The role assignment to create.
Returns:
The newly created project.
"""
return self._create_resource(
resource=user_role_assignment,
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
)
delete(self, path, params=None, **kwargs)
Make a DELETE request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def delete(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a DELETE request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending DELETE request to {path}...")
return self._request(
"DELETE",
self.url + API + VERSION_1 + path,
params=params,
**kwargs,
)
delete_artifact(self, artifact_id)
Deletes an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_id |
UUID |
The ID of the artifact to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_artifact(self, artifact_id: UUID) -> None:
"""Deletes an artifact.
Args:
artifact_id: The ID of the artifact to delete.
"""
self._delete_resource(resource_id=artifact_id, route=ARTIFACTS)
delete_flavor(*args, **kwargs)
Delete a stack component flavor.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_id |
The ID of the stack component flavor to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_pipeline(*args, **kwargs)
Deletes a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
The ID of the pipeline to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_project(*args, **kwargs)
Deletes a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Name or ID of the project to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_role(*args, **kwargs)
Deletes a role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Name or ID of the role to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_run(self, run_id)
Deletes a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
The ID of the pipeline run to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_run(self, run_id: UUID) -> None:
"""Deletes a pipeline run.
Args:
run_id: The ID of the pipeline run to delete.
"""
self._delete_resource(
resource_id=run_id,
route=RUNS,
)
delete_schedule(self, schedule_id)
Deletes a schedule.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule_id |
UUID |
The ID of the schedule to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_schedule(self, schedule_id: UUID) -> None:
"""Deletes a schedule.
Args:
schedule_id: The ID of the schedule to delete.
"""
self._delete_resource(
resource_id=schedule_id,
route=SCHEDULES,
)
delete_stack(*args, **kwargs)
Delete a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
The ID of the stack to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_stack_component(*args, **kwargs)
Delete a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
The ID of the stack component to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_team(*args, **kwargs)
Deletes a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Name or ID of the team to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_team_role_assignment(self, team_role_assignment_id)
Delete a specific role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_role_assignment_id |
UUID |
The ID of the specific role assignment |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_team_role_assignment(
self, team_role_assignment_id: UUID
) -> None:
"""Delete a specific role assignment.
Args:
team_role_assignment_id: The ID of the specific role assignment
"""
self._delete_resource(
resource_id=team_role_assignment_id,
route=TEAM_ROLE_ASSIGNMENTS,
)
delete_user(*args, **kwargs)
Deletes a user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
The name or ID of the user to delete. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_user_role_assignment(self, user_role_assignment_id)
Delete a specific role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_role_assignment_id |
UUID |
The ID of the specific role assignment |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def delete_user_role_assignment(
self, user_role_assignment_id: UUID
) -> None:
"""Delete a specific role assignment.
Args:
user_role_assignment_id: The ID of the specific role assignment
"""
self._delete_resource(
resource_id=user_role_assignment_id,
route=USER_ROLE_ASSIGNMENTS,
)
get(self, path, params=None, **kwargs)
Make a GET request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def get(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a GET request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending GET request to {path}...")
return self._request(
"GET", self.url + API + VERSION_1 + path, params=params, **kwargs
)
get_artifact(self, artifact_id)
Gets an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_id |
UUID |
The ID of the artifact to get. |
required |
Returns:
Type | Description |
---|---|
ArtifactResponseModel |
The artifact. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_artifact(self, artifact_id: UUID) -> ArtifactResponseModel:
"""Gets an artifact.
Args:
artifact_id: The ID of the artifact to get.
Returns:
The artifact.
"""
return self._get_resource(
resource_id=artifact_id,
route=ARTIFACTS,
response_model=ArtifactResponseModel,
)
get_auth_user(self, user_name_or_id)
Gets the auth model to a specific user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the user to get. |
required |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
This method is only available for the SQLZenStore. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_auth_user(
self, user_name_or_id: Union[str, UUID]
) -> "UserAuthModel":
"""Gets the auth model to a specific user.
Args:
user_name_or_id: The name or ID of the user to get.
Raises:
NotImplementedError: This method is only available for the
SQLZenStore.
"""
raise NotImplementedError(
"This method is only designed for use"
" by the server endpoints. It is not designed"
" to be called from the client side."
)
get_flavor(self, flavor_id)
Get a stack component flavor by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_id |
UUID |
The ID of the stack component flavor to get. |
required |
Returns:
Type | Description |
---|---|
FlavorResponseModel |
The stack component flavor. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_flavor(self, flavor_id: UUID) -> FlavorResponseModel:
"""Get a stack component flavor by ID.
Args:
flavor_id: The ID of the stack component flavor to get.
Returns:
The stack component flavor.
"""
return self._get_resource(
resource_id=flavor_id,
route=FLAVORS,
response_model=FlavorResponseModel,
)
get_or_create_run(self, pipeline_run)
Gets or creates a pipeline run.
If a run with the same ID or name already exists, it is returned. Otherwise, a new run is created.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRunRequestModel |
The pipeline run to get or create. |
required |
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_or_create_run(
self, pipeline_run: PipelineRunRequestModel
) -> PipelineRunResponseModel:
"""Gets or creates a pipeline run.
If a run with the same ID or name already exists, it is returned.
Otherwise, a new run is created.
Args:
pipeline_run: The pipeline run to get or create.
Returns:
The pipeline run.
"""
return self._create_project_scoped_resource(
resource=pipeline_run,
route=RUNS,
response_model=PipelineRunResponseModel,
params={"get_if_exists": True},
)
get_pipeline(self, pipeline_id)
Get a pipeline with a given ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
ID of the pipeline. |
required |
Returns:
Type | Description |
---|---|
PipelineResponseModel |
The pipeline. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_pipeline(self, pipeline_id: UUID) -> PipelineResponseModel:
"""Get a pipeline with a given ID.
Args:
pipeline_id: ID of the pipeline.
Returns:
The pipeline.
"""
return self._get_resource(
resource_id=pipeline_id,
route=PIPELINES,
response_model=PipelineResponseModel,
)
get_project(self, project_name_or_id)
Get an existing project by name or ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[uuid.UUID, str] |
Name or ID of the project to get. |
required |
Returns:
Type | Description |
---|---|
ProjectResponseModel |
The requested project. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_project(
self, project_name_or_id: Union[UUID, str]
) -> ProjectResponseModel:
"""Get an existing project by name or ID.
Args:
project_name_or_id: Name or ID of the project to get.
Returns:
The requested project.
"""
return self._get_resource(
resource_id=project_name_or_id,
route=PROJECTS,
response_model=ProjectResponseModel,
)
get_role(self, role_name_or_id)
Gets a specific role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the role to get. |
required |
Returns:
Type | Description |
---|---|
RoleResponseModel |
The requested role. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_role(self, role_name_or_id: Union[str, UUID]) -> RoleResponseModel:
"""Gets a specific role.
Args:
role_name_or_id: Name or ID of the role to get.
Returns:
The requested role.
"""
return self._get_resource(
resource_id=role_name_or_id,
route=ROLES,
response_model=RoleResponseModel,
)
get_run(self, run_name_or_id)
Gets a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name_or_id |
Union[uuid.UUID, str] |
The name or ID of the pipeline run to get. |
required |
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_run(
self, run_name_or_id: Union[UUID, str]
) -> PipelineRunResponseModel:
"""Gets a pipeline run.
Args:
run_name_or_id: The name or ID of the pipeline run to get.
Returns:
The pipeline run.
"""
return self._get_resource(
resource_id=run_name_or_id,
route=RUNS,
response_model=PipelineRunResponseModel,
)
get_run_step(self, step_run_id)
Get a step run by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run_id |
UUID |
The ID of the step run to get. |
required |
Returns:
Type | Description |
---|---|
StepRunResponseModel |
The step run. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_run_step(self, step_run_id: UUID) -> StepRunResponseModel:
"""Get a step run by ID.
Args:
step_run_id: The ID of the step run to get.
Returns:
The step run.
"""
return self._get_resource(
resource_id=step_run_id,
route=STEPS,
response_model=StepRunResponseModel,
)
get_schedule(self, schedule_id)
Get a schedule with a given ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule_id |
UUID |
ID of the schedule. |
required |
Returns:
Type | Description |
---|---|
ScheduleResponseModel |
The schedule. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_schedule(self, schedule_id: UUID) -> ScheduleResponseModel:
"""Get a schedule with a given ID.
Args:
schedule_id: ID of the schedule.
Returns:
The schedule.
"""
return self._get_resource(
resource_id=schedule_id,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
get_stack(self, stack_id)
Get a stack by its unique ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
UUID |
The ID of the stack to get. |
required |
Returns:
Type | Description |
---|---|
StackResponseModel |
The stack with the given ID. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_stack(self, stack_id: UUID) -> StackResponseModel:
"""Get a stack by its unique ID.
Args:
stack_id: The ID of the stack to get.
Returns:
The stack with the given ID.
"""
return self._get_resource(
resource_id=stack_id,
route=STACKS,
response_model=StackResponseModel,
)
get_stack_component(self, component_id)
Get a stack component by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
UUID |
The ID of the stack component to get. |
required |
Returns:
Type | Description |
---|---|
ComponentResponseModel |
The stack component. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_stack_component(
self, component_id: UUID
) -> ComponentResponseModel:
"""Get a stack component by ID.
Args:
component_id: The ID of the stack component to get.
Returns:
The stack component.
"""
return self._get_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
)
get_store_info(self)
Get information about the server.
Returns:
Type | Description |
---|---|
ServerModel |
Information about the server. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_store_info(self) -> ServerModel:
"""Get information about the server.
Returns:
Information about the server.
"""
body = self.get(INFO)
return ServerModel.parse_obj(body)
get_team(self, team_name_or_id)
Gets a specific team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the team to get. |
required |
Returns:
Type | Description |
---|---|
TeamResponseModel |
The requested team. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_team(self, team_name_or_id: Union[str, UUID]) -> TeamResponseModel:
"""Gets a specific team.
Args:
team_name_or_id: Name or ID of the team to get.
Returns:
The requested team.
"""
return self._get_resource(
resource_id=team_name_or_id,
route=TEAMS,
response_model=TeamResponseModel,
)
get_team_role_assignment(self, team_role_assignment_id)
Gets a specific role assignment.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_role_assignment_id |
UUID |
ID of the role assignment to get. |
required |
Returns:
Type | Description |
---|---|
TeamRoleAssignmentResponseModel |
The requested role assignment. |
Exceptions:
Type | Description |
---|---|
KeyError |
If no role assignment with the given ID exists. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_team_role_assignment(
self, team_role_assignment_id: UUID
) -> TeamRoleAssignmentResponseModel:
"""Gets a specific role assignment.
Args:
team_role_assignment_id: ID of the role assignment to get.
Returns:
The requested role assignment.
Raises:
KeyError: If no role assignment with the given ID exists.
"""
return self._get_resource(
resource_id=team_role_assignment_id,
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
)
get_user(self, user_name_or_id=None, include_private=False)
Gets a specific user, when no id is specified the active user is returned.
The include_private
parameter is ignored here as it is handled
implicitly by the /current-user endpoint that is queried when no
user_name_or_id is set. Raises a KeyError in case a user with that id
does not exist.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the user to get. |
None |
include_private |
bool |
Whether to include private user information |
False |
Returns:
Type | Description |
---|---|
UserResponseModel |
The requested user, if it was found. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_user(
self,
user_name_or_id: Optional[Union[str, UUID]] = None,
include_private: bool = False,
) -> UserResponseModel:
"""Gets a specific user, when no id is specified the active user is returned.
The `include_private` parameter is ignored here as it is handled
implicitly by the /current-user endpoint that is queried when no
user_name_or_id is set. Raises a KeyError in case a user with that id
does not exist.
Args:
user_name_or_id: The name or ID of the user to get.
include_private: Whether to include private user information
Returns:
The requested user, if it was found.
"""
if user_name_or_id:
return self._get_resource(
resource_id=user_name_or_id,
route=USERS,
response_model=UserResponseModel,
)
else:
body = self.get(CURRENT_USER)
return UserResponseModel.parse_obj(body)
get_user_role_assignment(self, user_role_assignment_id)
Get an existing role assignment by name or ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_role_assignment_id |
UUID |
Name or ID of the role assignment to get. |
required |
Returns:
Type | Description |
---|---|
UserRoleAssignmentResponseModel |
The requested project. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_user_role_assignment(
self, user_role_assignment_id: UUID
) -> UserRoleAssignmentResponseModel:
"""Get an existing role assignment by name or ID.
Args:
user_role_assignment_id: Name or ID of the role assignment to get.
Returns:
The requested project.
"""
return self._get_resource(
resource_id=user_role_assignment_id,
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
)
list_artifacts(self, artifact_filter_model)
List all artifacts matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_filter_model |
ArtifactFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[ArtifactResponseModel] |
A list of all artifacts matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_artifacts(
self, artifact_filter_model: ArtifactFilterModel
) -> Page[ArtifactResponseModel]:
"""List all artifacts matching the given filter criteria.
Args:
artifact_filter_model: All filter parameters including pagination
params.
Returns:
A list of all artifacts matching the filter criteria.
"""
return self._list_paginated_resources(
route=ARTIFACTS,
response_model=ArtifactResponseModel,
filter_model=artifact_filter_model,
)
list_flavors(self, flavor_filter_model)
List all stack component flavors matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_filter_model |
FlavorFilterModel |
All filter parameters including pagination |
required |
Returns:
Type | Description |
---|---|
Page[FlavorResponseModel] |
List of all the stack component flavors matching the given criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_flavors(
self, flavor_filter_model: FlavorFilterModel
) -> Page[FlavorResponseModel]:
"""List all stack component flavors matching the given filter criteria.
Args:
flavor_filter_model: All filter parameters including pagination
params
Returns:
List of all the stack component flavors matching the given criteria.
"""
return self._list_paginated_resources(
route=FLAVORS,
response_model=FlavorResponseModel,
filter_model=flavor_filter_model,
)
list_pipelines(self, pipeline_filter_model)
List all pipelines matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_filter_model |
PipelineFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[PipelineResponseModel] |
A list of all pipelines matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_pipelines(
self, pipeline_filter_model: PipelineFilterModel
) -> Page[PipelineResponseModel]:
"""List all pipelines matching the given filter criteria.
Args:
pipeline_filter_model: All filter parameters including pagination
params.
Returns:
A list of all pipelines matching the filter criteria.
"""
return self._list_paginated_resources(
route=PIPELINES,
response_model=PipelineResponseModel,
filter_model=pipeline_filter_model,
)
list_projects(self, project_filter_model)
List all project matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_filter_model |
ProjectFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[ProjectResponseModel] |
A list of all project matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_projects(
self, project_filter_model: ProjectFilterModel
) -> Page[ProjectResponseModel]:
"""List all project matching the given filter criteria.
Args:
project_filter_model: All filter parameters including pagination
params.
Returns:
A list of all project matching the filter criteria.
"""
return self._list_paginated_resources(
route=PROJECTS,
response_model=ProjectResponseModel,
filter_model=project_filter_model,
)
list_roles(self, role_filter_model)
List all roles matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_filter_model |
RoleFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[RoleResponseModel] |
A list of all roles matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_roles(
self, role_filter_model: RoleFilterModel
) -> Page[RoleResponseModel]:
"""List all roles matching the given filter criteria.
Args:
role_filter_model: All filter parameters including pagination
params.
Returns:
A list of all roles matching the filter criteria.
"""
return self._list_paginated_resources(
route=ROLES,
response_model=RoleResponseModel,
filter_model=role_filter_model,
)
list_run_steps(self, step_run_filter_model)
List all step runs matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run_filter_model |
StepRunFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[StepRunResponseModel] |
A list of all step runs matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_run_steps(
self, step_run_filter_model: StepRunFilterModel
) -> Page[StepRunResponseModel]:
"""List all step runs matching the given filter criteria.
Args:
step_run_filter_model: All filter parameters including pagination
params.
Returns:
A list of all step runs matching the filter criteria.
"""
return self._list_paginated_resources(
route=STEPS,
response_model=StepRunResponseModel,
filter_model=step_run_filter_model,
)
list_runs(self, runs_filter_model)
List all pipeline runs matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
runs_filter_model |
PipelineRunFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[PipelineRunResponseModel] |
A list of all pipeline runs matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_runs(
self, runs_filter_model: PipelineRunFilterModel
) -> Page[PipelineRunResponseModel]:
"""List all pipeline runs matching the given filter criteria.
Args:
runs_filter_model: All filter parameters including pagination
params.
Returns:
A list of all pipeline runs matching the filter criteria.
"""
return self._list_paginated_resources(
route=RUNS,
response_model=PipelineRunResponseModel,
filter_model=runs_filter_model,
)
list_schedules(self, schedule_filter_model)
List all schedules in the project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule_filter_model |
ScheduleFilterModel |
All filter parameters including pagination params |
required |
Returns:
Type | Description |
---|---|
Page[ScheduleResponseModel] |
A list of schedules. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_schedules(
self, schedule_filter_model: ScheduleFilterModel
) -> Page[ScheduleResponseModel]:
"""List all schedules in the project.
Args:
schedule_filter_model: All filter parameters including pagination
params
Returns:
A list of schedules.
"""
return self._list_paginated_resources(
route=SCHEDULES,
response_model=ScheduleResponseModel,
filter_model=schedule_filter_model,
)
list_stack_components(self, component_filter_model)
List all stack components matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_filter_model |
ComponentFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[ComponentResponseModel] |
A list of all stack components matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_stack_components(
self, component_filter_model: ComponentFilterModel
) -> Page[ComponentResponseModel]:
"""List all stack components matching the given filter criteria.
Args:
component_filter_model: All filter parameters including pagination
params.
Returns:
A list of all stack components matching the filter criteria.
"""
return self._list_paginated_resources(
route=STACK_COMPONENTS,
response_model=ComponentResponseModel,
filter_model=component_filter_model,
)
list_stacks(self, stack_filter_model)
List all stacks matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_filter_model |
StackFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[StackResponseModel] |
A list of all stacks matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_stacks(
self, stack_filter_model: StackFilterModel
) -> Page[StackResponseModel]:
"""List all stacks matching the given filter criteria.
Args:
stack_filter_model: All filter parameters including pagination
params.
Returns:
A list of all stacks matching the filter criteria.
"""
return self._list_paginated_resources(
route=STACKS,
response_model=StackResponseModel,
filter_model=stack_filter_model,
)
list_team_role_assignments(self, team_role_assignment_filter_model)
List all roles assignments matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_role_assignment_filter_model |
TeamRoleAssignmentFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[TeamRoleAssignmentResponseModel] |
A list of all roles assignments matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_team_role_assignments(
self, team_role_assignment_filter_model: TeamRoleAssignmentFilterModel
) -> Page[TeamRoleAssignmentResponseModel]:
"""List all roles assignments matching the given filter criteria.
Args:
team_role_assignment_filter_model: All filter parameters including
pagination params.
Returns:
A list of all roles assignments matching the filter criteria.
"""
return self._list_paginated_resources(
route=TEAM_ROLE_ASSIGNMENTS,
response_model=TeamRoleAssignmentResponseModel,
filter_model=team_role_assignment_filter_model,
)
list_teams(self, team_filter_model)
List all teams matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_filter_model |
TeamFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[TeamResponseModel] |
A list of all teams matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_teams(
self, team_filter_model: TeamFilterModel
) -> Page[TeamResponseModel]:
"""List all teams matching the given filter criteria.
Args:
team_filter_model: All filter parameters including pagination
params.
Returns:
A list of all teams matching the filter criteria.
"""
return self._list_paginated_resources(
route=TEAMS,
response_model=TeamResponseModel,
filter_model=team_filter_model,
)
list_user_role_assignments(self, user_role_assignment_filter_model)
List all roles assignments matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_role_assignment_filter_model |
UserRoleAssignmentFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[UserRoleAssignmentResponseModel] |
A list of all roles assignments matching the filter criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_user_role_assignments(
self, user_role_assignment_filter_model: UserRoleAssignmentFilterModel
) -> Page[UserRoleAssignmentResponseModel]:
"""List all roles assignments matching the given filter criteria.
Args:
user_role_assignment_filter_model: All filter parameters including
pagination params.
Returns:
A list of all roles assignments matching the filter criteria.
"""
return self._list_paginated_resources(
route=USER_ROLE_ASSIGNMENTS,
response_model=UserRoleAssignmentResponseModel,
filter_model=user_role_assignment_filter_model,
)
list_users(self, user_filter_model)
List all users.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_filter_model |
UserFilterModel |
All filter parameters including pagination params. |
required |
Returns:
Type | Description |
---|---|
Page[UserResponseModel] |
A list of all users. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_users(
self, user_filter_model: UserFilterModel
) -> Page[UserResponseModel]:
"""List all users.
Args:
user_filter_model: All filter parameters including pagination
params.
Returns:
A list of all users.
"""
return self._list_paginated_resources(
route=USERS,
response_model=UserResponseModel,
filter_model=user_filter_model,
)
post(self, path, body, params=None, **kwargs)
Make a POST request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
body |
BaseModel |
The body to send. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def post(
self,
path: str,
body: BaseModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a POST request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending POST request to {path}...")
return self._request(
"POST",
self.url + API + VERSION_1 + path,
data=body.json(),
params=params,
**kwargs,
)
put(self, path, body, params=None, **kwargs)
Make a PUT request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
body |
BaseModel |
The body to send. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def put(
self,
path: str,
body: BaseModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a PUT request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending PUT request to {path}...")
return self._request(
"PUT",
self.url + API + VERSION_1 + path,
data=body.json(exclude_unset=True),
params=params,
**kwargs,
)
update_pipeline(*args, **kwargs)
Updates a pipeline.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
The ID of the pipeline to be updated. |
required | |
pipeline_update |
The update to be applied. |
required |
Returns:
Type | Description |
---|---|
Any |
The updated pipeline. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
update_project(*args, **kwargs)
Update an existing project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_id |
The ID of the project to be updated. |
required | |
project_update |
The update to be applied to the project. |
required |
Returns:
Type | Description |
---|---|
Any |
The updated project. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
update_role(*args, **kwargs)
Update an existing role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_id |
The ID of the role to be updated. |
required | |
role_update |
The update to be applied to the role. |
required |
Returns:
Type | Description |
---|---|
Any |
The updated role. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
update_run(self, run_id, run_update)
Updates a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
The ID of the pipeline run to update. |
required |
run_update |
PipelineRunUpdateModel |
The update to be applied to the pipeline run. |
required |
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The updated pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def update_run(
self, run_id: UUID, run_update: PipelineRunUpdateModel
) -> PipelineRunResponseModel:
"""Updates a pipeline run.
Args:
run_id: The ID of the pipeline run to update.
run_update: The update to be applied to the pipeline run.
Returns:
The updated pipeline run.
"""
return self._update_resource(
resource_id=run_id,
resource_update=run_update,
response_model=PipelineRunResponseModel,
route=RUNS,
)
update_run_step(self, step_run_id, step_run_update)
Updates a step run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_run_id |
UUID |
The ID of the step to update. |
required |
step_run_update |
StepRunUpdateModel |
The update to be applied to the step. |
required |
Returns:
Type | Description |
---|---|
StepRunResponseModel |
The updated step run. |
Source code in zenml/zen_stores/rest_zen_store.py
def update_run_step(
self,
step_run_id: UUID,
step_run_update: StepRunUpdateModel,
) -> StepRunResponseModel:
"""Updates a step run.
Args:
step_run_id: The ID of the step to update.
step_run_update: The update to be applied to the step.
Returns:
The updated step run.
"""
return self._update_resource(
resource_id=step_run_id,
resource_update=step_run_update,
response_model=StepRunResponseModel,
route=STEPS,
)
update_schedule(self, schedule_id, schedule_update)
Updates a schedule.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
schedule_id |
UUID |
The ID of the schedule to be updated. |
required |
schedule_update |
ScheduleUpdateModel |
The update to be applied. |
required |
Returns:
Type | Description |
---|---|
ScheduleResponseModel |
The updated schedule. |
Source code in zenml/zen_stores/rest_zen_store.py
def update_schedule(
self,
schedule_id: UUID,
schedule_update: ScheduleUpdateModel,
) -> ScheduleResponseModel:
"""Updates a schedule.
Args:
schedule_id: The ID of the schedule to be updated.
schedule_update: The update to be applied.
Returns:
The updated schedule.
"""
return self._update_resource(
resource_id=schedule_id,
resource_update=schedule_update,
route=SCHEDULES,
response_model=ScheduleResponseModel,
)
update_stack(*args, **kwargs)
Update a stack.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
The ID of the stack update. |
required | |
stack_update |
The update request on the stack. |
required |
Returns:
Type | Description |
---|---|
Any |
The updated stack. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
update_stack_component(*args, **kwargs)
Update an existing stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
The ID of the stack component to update. |
required | |
component_update |
The update to be applied to the stack component. |
required |
Returns:
Type | Description |
---|---|
Any |
The updated stack component. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
update_team(*args, **kwargs)
Update an existing team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_id |
The ID of the team to be updated. |
required | |
team_update |
The update to be applied to the team. |
required |
Returns:
Type | Description |
---|---|
Any |
The updated team. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
update_user(*args, **kwargs)
Updates an existing user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_id |
The id of the user to update. |
required | |
user_update |
The update to be applied to the user. |
required |
Returns:
Type | Description |
---|---|
Any |
The updated user. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
with event_handler(event) as handler:
try:
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
handler.tracker = args[0]
for obj in list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
handler.metadata = obj.get_analytics_metadata()
break
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
result = func(*args, **kwargs)
try:
if isinstance(result, AnalyticsTrackedModelMixin):
handler.metadata = result.get_analytics_metadata()
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
RestZenStoreConfiguration (StoreConfiguration)
pydantic-model
REST ZenML store configuration.
Attributes:
Name | Type | Description |
---|---|---|
username |
Optional[str] |
The username to use to connect to the Zen server. |
password |
Optional[str] |
The password to use to connect to the Zen server. |
verify_ssl |
Union[bool, str] |
Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use or the CA bundle value itself. |
http_timeout |
int |
The timeout to use for all requests. |
Source code in zenml/zen_stores/rest_zen_store.py
class RestZenStoreConfiguration(StoreConfiguration):
"""REST ZenML store configuration.
Attributes:
username: The username to use to connect to the Zen server.
password: The password to use to connect to the Zen server.
verify_ssl: Either a boolean, in which case it controls whether we
verify the server's TLS certificate, or a string, in which case it
must be a path to a CA bundle to use or the CA bundle value itself.
http_timeout: The timeout to use for all requests.
"""
type: StoreType = StoreType.REST
username: Optional[str] = None
password: Optional[str] = None
api_token: Optional[str] = None
verify_ssl: Union[bool, str] = True
http_timeout: int = DEFAULT_HTTP_TIMEOUT
@root_validator
def validate_credentials(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validates the credentials provided in the values dictionary.
Args:
values: A dictionary containing the values to be validated.
Raises:
ValueError: If neither api_token nor username is set.
Returns:
The values dictionary.
"""
# Check if the values dictionary contains either an api_token or a
# username as non-empty strings.
if values.get("api_token") or values.get("username"):
return values
else:
raise ValueError(
"Neither api_token nor username is set in the store config."
)
@validator("url")
def validate_url(cls, url: str) -> str:
"""Validates that the URL is a well-formed REST store URL.
Args:
url: The URL to be validated.
Returns:
The validated URL without trailing slashes.
Raises:
ValueError: If the URL is not a well-formed REST store URL.
"""
url = url.rstrip("/")
scheme = re.search("^([a-z0-9]+://)", url)
if scheme is None or scheme.group() not in ("https://", "http://"):
raise ValueError(
"Invalid URL for REST store: {url}. Should be in the form "
"https://hostname[:port] or http://hostname[:port]."
)
# When running inside a container, if the URL uses localhost, the
# target service will not be available. We try to replace localhost
# with one of the special Docker or K3D internal hostnames.
url = replace_localhost_with_internal_hostname(url)
return url
@validator("verify_ssl")
def validate_verify_ssl(
cls, verify_ssl: Union[bool, str]
) -> Union[bool, str]:
"""Validates that the verify_ssl either points to a file or is a bool.
Args:
verify_ssl: The verify_ssl value to be validated.
Returns:
The validated verify_ssl value.
"""
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
if isinstance(verify_ssl, bool) or verify_ssl.startswith(
str(secret_folder)
):
return verify_ssl
if os.path.isfile(verify_ssl):
with open(verify_ssl, "r") as f:
verify_ssl = f.read()
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, "ca_bundle.pem")
with open(file_path, "w") as f:
f.write(verify_ssl)
file_path.chmod(0o600)
verify_ssl = str(file_path)
return verify_ssl
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
"""Check if a URL scheme is supported by this store.
Args:
url: The URL to check.
Returns:
True if the URL scheme is supported, False otherwise.
"""
return urlparse(url).scheme in ("http", "https")
def expand_certificates(self) -> None:
"""Expands the certificates in the verify_ssl field."""
# Load the certificate values back into the configuration
if isinstance(self.verify_ssl, str) and os.path.isfile(
self.verify_ssl
):
with open(self.verify_ssl, "r") as f:
self.verify_ssl = f.read()
@classmethod
def copy_configuration(
cls,
config: "StoreConfiguration",
config_path: str,
load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
"""Create a copy of the store config using a different path.
This method is used to create a copy of the store configuration that can
be loaded using a different configuration path or in the context of a
new environment, such as a container image.
The configuration files accompanying the store configuration are also
copied to the new configuration path (e.g. certificates etc.).
Args:
config: The store configuration to copy.
config_path: new path where the configuration copy will be loaded
from.
load_config_path: absolute path that will be used to load the copied
configuration. This can be set to a value different from
`config_path` if the configuration copy will be loaded from
a different environment, e.g. when the configuration is copied
to a container image and loaded using a different absolute path.
This will be reflected in the paths and URLs encoded in the
copied configuration.
Returns:
A new store configuration object that reflects the new configuration
path.
"""
assert isinstance(config, RestZenStoreConfiguration)
assert config.api_token is not None
config = config.copy(exclude={"username", "password"}, deep=True)
# Load the certificate values back into the configuration
config.expand_certificates()
return config
class Config:
"""Pydantic configuration class."""
# Don't validate attributes when assigning them. This is necessary
# because the `verify_ssl` attribute can be expanded to the contents
# of the certificate file.
validate_assignment = False
# Forbid extra attributes set in the class.
extra = "forbid"
Config
Pydantic configuration class.
Source code in zenml/zen_stores/rest_zen_store.py
class Config:
"""Pydantic configuration class."""
# Don't validate attributes when assigning them. This is necessary
# because the `verify_ssl` attribute can be expanded to the contents
# of the certificate file.
validate_assignment = False
# Forbid extra attributes set in the class.
extra = "forbid"
copy_configuration(config, config_path, load_config_path=None)
classmethod
Create a copy of the store config using a different path.
This method is used to create a copy of the store configuration that can be loaded using a different configuration path or in the context of a new environment, such as a container image.
The configuration files accompanying the store configuration are also copied to the new configuration path (e.g. certificates etc.).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
StoreConfiguration |
The store configuration to copy. |
required |
config_path |
str |
new path where the configuration copy will be loaded from. |
required |
load_config_path |
Optional[pathlib.PurePath] |
absolute path that will be used to load the copied
configuration. This can be set to a value different from
|
None |
Returns:
Type | Description |
---|---|
StoreConfiguration |
A new store configuration object that reflects the new configuration path. |
Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def copy_configuration(
cls,
config: "StoreConfiguration",
config_path: str,
load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
"""Create a copy of the store config using a different path.
This method is used to create a copy of the store configuration that can
be loaded using a different configuration path or in the context of a
new environment, such as a container image.
The configuration files accompanying the store configuration are also
copied to the new configuration path (e.g. certificates etc.).
Args:
config: The store configuration to copy.
config_path: new path where the configuration copy will be loaded
from.
load_config_path: absolute path that will be used to load the copied
configuration. This can be set to a value different from
`config_path` if the configuration copy will be loaded from
a different environment, e.g. when the configuration is copied
to a container image and loaded using a different absolute path.
This will be reflected in the paths and URLs encoded in the
copied configuration.
Returns:
A new store configuration object that reflects the new configuration
path.
"""
assert isinstance(config, RestZenStoreConfiguration)
assert config.api_token is not None
config = config.copy(exclude={"username", "password"}, deep=True)
# Load the certificate values back into the configuration
config.expand_certificates()
return config
expand_certificates(self)
Expands the certificates in the verify_ssl field.
Source code in zenml/zen_stores/rest_zen_store.py
def expand_certificates(self) -> None:
"""Expands the certificates in the verify_ssl field."""
# Load the certificate values back into the configuration
if isinstance(self.verify_ssl, str) and os.path.isfile(
self.verify_ssl
):
with open(self.verify_ssl, "r") as f:
self.verify_ssl = f.read()
supports_url_scheme(url)
classmethod
Check if a URL scheme is supported by this store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the URL scheme is supported, False otherwise. |
Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
"""Check if a URL scheme is supported by this store.
Args:
url: The URL to check.
Returns:
True if the URL scheme is supported, False otherwise.
"""
return urlparse(url).scheme in ("http", "https")
validate_credentials(values)
classmethod
Validates the credentials provided in the values dictionary.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
values |
Dict[str, Any] |
A dictionary containing the values to be validated. |
required |
Exceptions:
Type | Description |
---|---|
ValueError |
If neither api_token nor username is set. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The values dictionary. |
Source code in zenml/zen_stores/rest_zen_store.py
@root_validator
def validate_credentials(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""Validates the credentials provided in the values dictionary.
Args:
values: A dictionary containing the values to be validated.
Raises:
ValueError: If neither api_token nor username is set.
Returns:
The values dictionary.
"""
# Check if the values dictionary contains either an api_token or a
# username as non-empty strings.
if values.get("api_token") or values.get("username"):
return values
else:
raise ValueError(
"Neither api_token nor username is set in the store config."
)
validate_url(url)
classmethod
Validates that the URL is a well-formed REST store URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to be validated. |
required |
Returns:
Type | Description |
---|---|
str |
The validated URL without trailing slashes. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the URL is not a well-formed REST store URL. |
Source code in zenml/zen_stores/rest_zen_store.py
@validator("url")
def validate_url(cls, url: str) -> str:
"""Validates that the URL is a well-formed REST store URL.
Args:
url: The URL to be validated.
Returns:
The validated URL without trailing slashes.
Raises:
ValueError: If the URL is not a well-formed REST store URL.
"""
url = url.rstrip("/")
scheme = re.search("^([a-z0-9]+://)", url)
if scheme is None or scheme.group() not in ("https://", "http://"):
raise ValueError(
"Invalid URL for REST store: {url}. Should be in the form "
"https://hostname[:port] or http://hostname[:port]."
)
# When running inside a container, if the URL uses localhost, the
# target service will not be available. We try to replace localhost
# with one of the special Docker or K3D internal hostnames.
url = replace_localhost_with_internal_hostname(url)
return url
validate_verify_ssl(verify_ssl)
classmethod
Validates that the verify_ssl either points to a file or is a bool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
verify_ssl |
Union[bool, str] |
The verify_ssl value to be validated. |
required |
Returns:
Type | Description |
---|---|
Union[bool, str] |
The validated verify_ssl value. |
Source code in zenml/zen_stores/rest_zen_store.py
@validator("verify_ssl")
def validate_verify_ssl(
cls, verify_ssl: Union[bool, str]
) -> Union[bool, str]:
"""Validates that the verify_ssl either points to a file or is a bool.
Args:
verify_ssl: The verify_ssl value to be validated.
Returns:
The validated verify_ssl value.
"""
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
if isinstance(verify_ssl, bool) or verify_ssl.startswith(
str(secret_folder)
):
return verify_ssl
if os.path.isfile(verify_ssl):
with open(verify_ssl, "r") as f:
verify_ssl = f.read()
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, "ca_bundle.pem")
with open(file_path, "w") as f:
f.write(verify_ssl)
file_path.chmod(0o600)
verify_ssl = str(file_path)
return verify_ssl
schemas
special
SQL Model Implementations.
artifact_schemas
SQLModel implementation of artifact tables.
ArtifactSchema (NamedSchema)
pydantic-model
SQL Model for artifacts of steps.
Source code in zenml/zen_stores/schemas/artifact_schemas.py
class ArtifactSchema(NamedSchema, table=True):
"""SQL Model for artifacts of steps."""
__tablename__ = "artifact"
artifact_store_id: Optional[UUID] = build_foreign_key_field(
source=__tablename__,
target=StackComponentSchema.__tablename__,
source_column="artifact_store_id",
target_column="id",
ondelete="SET NULL",
nullable=True,
)
user_id: Optional[UUID] = build_foreign_key_field(
source=__tablename__,
target=UserSchema.__tablename__,
source_column="user_id",
target_column="id",
ondelete="SET NULL",
nullable=True,
)
user: Optional["UserSchema"] = Relationship(back_populates="artifacts")
project_id: UUID = build_foreign_key_field(
source=__tablename__,
target=ProjectSchema.__tablename__,
source_column="project_id",
target_column="id",
ondelete="CASCADE",
nullable=False,
)
project: "ProjectSchema" = Relationship(back_populates="artifacts")
type: ArtifactType
uri: str
materializer: str
data_type: str
input_to_step_runs: List["StepRunInputArtifactSchema"] = Relationship(
back_populates="artifact",
sa_relationship_kwargs={"cascade": "delete"},
)
output_of_step_runs: List["StepRunOutputArtifactSchema"] = Relationship(
back_populates="artifact",
sa_relationship_kwargs={"cascade": "delete"},
)
@classmethod
def from_request(
cls, artifact_request: ArtifactRequestModel
) -> "ArtifactSchema":
"""Convert an `ArtifactRequestModel` to an `ArtifactSchema`.
Args:
artifact_request: The request model to convert.
Returns:
The converted schema.
"""
return cls(
name=artifact_request.name,
artifact_store_id=artifact_request.artifact_store_id,
project_id=artifact_request.project,
user_id=artifact_request.user,
type=artifact_request.type,
uri=artifact_request.uri,
materializer=artifact_request.materializer,
data_type=artifact_request.data_type,
)
def to_model(
self, producer_step_run_id: Optional[UUID]
) -> ArtifactResponseModel:
"""Convert an `ArtifactSchema` to an `ArtifactModel`.
Args:
producer_step_run_id: The ID of the step run that produced this
artifact.
Returns:
The created `ArtifactModel`.
"""
return ArtifactResponseModel(
id=self.id,
name=self.name,
artifact_store_id=self.artifact_store_id,
user=self.user.to_model() if self.user else None,
project=self.project.to_model(),
type=self.type,
uri=self.uri,
materializer=self.materializer,
data_type=self.data_type,
created=self.created,
updated=self.updated,
producer_step_run_id=producer_step_run_id,
)
from_request(artifact_request)
classmethod
Convert an ArtifactRequestModel
to an ArtifactSchema
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_request |
ArtifactRequestModel |
The request model to convert. |
required |
Returns:
Type | Description |
---|---|
ArtifactSchema |
The converted schema. |
Source code in zenml/zen_stores/schemas/artifact_schemas.py
@classmethod
def from_request(
cls, artifact_request: ArtifactRequestModel
) -> "ArtifactSchema":
"""Convert an `ArtifactRequestModel` to an `ArtifactSchema`.
Args:
artifact_request: The request model to convert.
Returns:
The converted schema.
"""
return cls(
name=artifact_request.name,
artifact_store_id=artifact_request.artifact_store_id,
project_id=artifact_request.project,
user_id=artifact_request.user,
type=artifact_request.type,
uri=artifact_request.uri,
materializer=artifact_request.materializer,
data_type=artifact_request.data_type,
)
to_model(self, producer_step_run_id)
Convert an ArtifactSchema
to an ArtifactModel
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
producer_step_run_id |
Optional[uuid.UUID] |
The ID of the step run that produced this artifact. |
required |
Returns:
Type | Description |
---|---|
ArtifactResponseModel |
The created |
Source code in zenml/zen_stores/schemas/artifact_schemas.py
def to_model(
self, producer_step_run_id: Optional[UUID]
) -> ArtifactResponseModel:
"""Convert an `ArtifactSchema` to an `ArtifactModel`.
Args:
producer_step_run_id: The ID of the step run that produced this
artifact.
Returns:
The created `ArtifactModel`.
"""
return ArtifactResponseModel(
id=self.id,
name=self.name,
artifact_store_id=self.artifact_store_id,
user=self.user.to_model() if self.user else None,
project=self.project.to_model(),
type=self.type,
uri=self.uri,
materializer=self.materializer,
data_type=self.data_type,
created=self.created,
updated=self.updated,
producer_step_run_id=producer_step_run_id,
)
base_schemas
Base classes for SQLModel schemas.
BaseSchema (SQLModel)
pydantic-model
Base SQL Model for ZenML entities.
Source code in zenml/zen_stores/schemas/base_schemas.py
class BaseSchema(SQLModel):
"""Base SQL Model for ZenML entities."""
id: UUID = Field(default_factory=uuid4, primary_key=True)
created: datetime = Field(default_factory=datetime.utcnow)
updated: datetime = Field(default_factory=datetime.utcnow)
NamedSchema (BaseSchema)
pydantic-model
Base Named SQL Model.
Source code in zenml/zen_stores/schemas/base_schemas.py
class NamedSchema(BaseSchema):
"""Base Named SQL Model."""
name: str
ShareableSchema (NamedSchema)
pydantic-model
Base shareable SQL Model.
Source code in zenml/zen_stores/schemas/base_schemas.py
class ShareableSchema(NamedSchema):
"""Base shareable SQL Model."""
is_shared: bool
component_schemas
SQL Model Implementations for Stack Components.
StackComponentSchema (ShareableSchema)
pydantic-model
SQL Model for stack components.
Source code in zenml/zen_stores/schemas/component_schemas.py
class StackComponentSchema(ShareableSchema, table=True):
"""SQL Model for stack components."""
__tablename__ = "stack_component"
type: StackComponentType
flavor: str
configuration: bytes
project_id: UUID = build_foreign_key_field(
source=__tablename__,
target=ProjectSchema.__tablename__,
source_column="project_id",
target_column="id",
ondelete="CASCADE",
nullable=False,
)
project: "ProjectSchema" = Relationship(back_populates="components")
user_id: Optional[UUID] = build_foreign_key_field(
source=__tablename__,
target=UserSchema.__tablename__,
source_column="user_id",
target_column="id",
ondelete="SET NULL",
nullable=True,
)
user: Optional["UserSchema"] = Relationship(back_populates="components")
stacks: List["StackSchema"] = Relationship(
back_populates="components", link_model=StackCompositionSchema
)
schedules: List["ScheduleSchema"] = Relationship(
back_populates="orchestrator",
)
def update(
self, component_update: ComponentUpdateModel
) -> "StackComponentSchema":
"""Updates a `StackSchema` from a `ComponentUpdateModel`.
Args:
component_update: The `ComponentUpdateModel` to update from.
Returns:
The updated `StackComponentSchema`.
"""
for field, value in component_update.dict(
exclude_unset=True, exclude={"project", "user"}
).items():
if field == "configuration":
self.configuration = base64.b64encode(
json.dumps(component_update.configuration).encode("utf-8")
)
else:
setattr(self, field, value)
self.updated = datetime.utcnow()
return self
def to_model(
self,
) -> "ComponentResponseModel":
"""Creates a `ComponentModel` from an instance of a `StackSchema`.
Returns:
A `ComponentModel`
"""
return ComponentResponseModel(
id=self.id,
name=self.name,
type=self.type,
flavor=self.flavor,
user=self.user.to_model(True) if self.user else None,
project=self.project.to_model(),
is_shared=self.is_shared,
configuration=json.loads(
base64.b64decode(self.configuration).decode()
),
created=self.created,
updated=self.updated,
)
to_model(self)
Creates a ComponentModel
from an instance of a StackSchema
.
Returns:
Type | Description |
---|---|
ComponentResponseModel |
A |
Source code in zenml/zen_stores/schemas/component_schemas.py
def to_model(
self,
) -> "ComponentResponseModel":
"""Creates a `ComponentModel` from an instance of a `StackSchema`.
Returns:
A `ComponentModel`
"""
return ComponentResponseModel(
id=self.id,
name=self.name,
type=self.type,
flavor=self.flavor,
user=self.user.to_model(True) if self.user else None,
project=self.project.to_model(),
is_shared=self.is_shared,
configuration=json.loads(
base64.b64decode(self.configuration).decode()
),
created=self.created,
updated=self.updated,
)
update(self, component_update)
Updates a StackSchema
from a ComponentUpdateModel
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_update |
ComponentUpdateModel |
The |
required |
Returns:
Type | Description |
---|---|
StackComponentSchema |
The updated |
Source code in zenml/zen_stores/schemas/component_schemas.py
def update(
self, component_update: ComponentUpdateModel
) -> "StackComponentSchema":
"""Updates a `StackSchema` from a `ComponentUpdateModel`.
Args:
component_update: The `ComponentUpdateModel` to update from.
Returns:
The updated `StackComponentSchema`.
"""
for field, value in component_update.dict(
exclude_unset=True, exclude={"project", "user"}
).items():
if field == "configuration":
self.configuration = base64.b64encode(
json.dumps(component_update.configuration).encode("utf-8")
)
else:
setattr(self, field, value)
self.updated = datetime.utcnow()
return self
flavor_schemas
SQL Model Implementations for Flavors.
FlavorSchema (NamedSchema)
pydantic-model
SQL Model for flavors.
Attributes:
Name | Type | Description |
---|---|---|
type |
StackComponentType |
The type of the flavor. |
source |
str |
The source of the flavor. |
config_schema |
str |
The config schema of the flavor. |
integration |
Optional[str] |
The integration associated with the flavor. |
Source code in zenml/zen_stores/schemas/flavor_schemas.py
class FlavorSchema(NamedSchema, table=True):
"""SQL Model for flavors.
Attributes:
type: The type of the flavor.
source: The source of the flavor.
config_schema: The config schema of the flavor.
integration: The integration associated with the flavor.
"""
__tablename__ = "flavor"
type: StackComponentType
source: str
config_schema: str = Field(sa_column=Column(TEXT, nullable=False))
integration: Optional[str] = Field(default="")
project_id: UUID = build_foreign_key_field(
source=__tablename__,
target=ProjectSchema.__tablename__,
source_column="project_id",
target_column="id",
ondelete="CASCADE",
nullable=False,
)
project: "ProjectSchema" = Relationship(back_populates="flavors")
user_id: Optional[UUID] = build_foreign_key_field(
source=__tablename__,
target=UserSchema.__tablename__,
source_column="user_id",
target_column="id",
ondelete="SET NULL",
nullable=True,
)
user: Optional["UserSchema"] = Relationship(back_populates="flavors")
def to_model(self) -> FlavorResponseModel:
"""Converts a flavor schema to a flavor model.
Returns:
The flavor model.
"""
return FlavorResponseModel(
id=self.id,
name=self.name,
type=self.type,
source=self.source,
config_schema=self.config_schema,
integration=self.integration,
user=self.user.to_model() if self.user else None,
project=self.project.to_model(),
created=self.created,
updated=self.updated,
)
to_model(self)
Converts a flavor schema to a flavor model.
Returns:
Type | Description |
---|---|
FlavorResponseModel |
The flavor model. |
Source code in zenml/zen_stores/schemas/flavor_schemas.py
def to_model(self) -> FlavorResponseModel:
"""Converts a flavor schema to a flavor model.
Returns:
The flavor model.
"""
return FlavorResponseModel(
id=self.id,
name=self.name,
type=self.type,
source=self.source,
config_schema=self.config_schema,
integration=self.integration,
user=self.user.to_model() if self.user else None,
project=self.project.to_model(),
created=self.created,
updated=self.updated,
)
identity_schemas
SQLModel implementation for the server information table.
IdentitySchema (SQLModel)
pydantic-model
SQL Model for the client/server identity.
Source code in zenml/zen_stores/schemas/identity_schemas.py
class IdentitySchema(SQLModel, table=True):
"""SQL Model for the client/server identity."""
__tablename__ = "identity"
id: UUID = Field(primary_key=True)
pipeline_run_schemas
SQLModel implementation of pipeline run tables.
PipelineRunSchema (NamedSchema)
pydantic-model
SQL Model for pipeline runs.
Source code in zenml/zen_stores/schemas/pipeline_run_schemas.py
class PipelineRunSchema(NamedSchema, table=True):
"""SQL Model for pipeline runs."""
__tablename__ = "pipeline_run"
stack_id: Optional[UUID] = build_foreign_key_field(
source=__tablename__,
target=StackSchema.__tablename__,
source_column="stack_id",
target_column="id",
ondelete="SET NULL",
nullable=True,
)
stack: "StackSchema" = Relationship(back_populates="runs")
pipeline_id: Optional[UUID] = build_foreign_key_field(
source=__tablename__,
target=PipelineSchema.__tablename__,
source_column="pipeline_id",
target_column="id",
ondelete="SET NULL",
nullable=True,
)
pipeline: "PipelineSchema" = Relationship(back_populates="runs")
schedule_id: Optional[UUID] = build_foreign_key_field(
source=__tablename__,
target=ScheduleSchema.__tablename__,
source_column="schedule_id",
target_column="id",
ondelete="SET NULL",
nullable=True,
)
schedule: ScheduleSchema = Relationship(back_populates="runs")
user_id: Optional[UUID] = build_foreign_key_field(
source=__tablename__,
target=UserSchema.__tablename__,
source_column="user_id",
target_column="id",
ondelete="SET NULL",
nullable=True,
)
user: Optional["UserSchema"] = Relationship(back_populates="runs")
project_id: UUID = build_foreign_key_field(
source=__tablename__,
target=ProjectSchema.__tablename__,
source_column="project_id",
target_column="id",
ondelete="CASCADE",
nullable=False,
)
project: "ProjectSchema" = Relationship(back_populates="runs")
orchestrator_run_id: Optional[str] = Field(nullable=True)
enable_cache: Optional[bool] = Field(nullable=True)
start_time: Optional[datetime] = Field(nullable=True)
end_time: Optional[datetime] = Field(nullable=True)
status: ExecutionStatus
pipeline_configuration: str = Field(sa_column=Column(TEXT, nullable=False))
num_steps: Optional[int]
zenml_version: str
client_environment: Optional[str] = Field(
sa_column=Column(TEXT, nullable=True)
)
orchestrator_environment: Optional[str] = Field(
sa_column=Column(TEXT, nullable=True)
)
git_sha: Optional[str] = Field(nullable=True)
step_runs: List["StepRunSchema"] = Relationship(
back_populates="pipeline_run",
sa_relationship_kwargs={"cascade": "delete"},
)
@classmethod
def from_request(
cls, request: PipelineRunRequestModel
) -> "PipelineRunSchema":
"""Convert a `PipelineRunRequestModel` to a `PipelineRunSchema`.
Args:
request: The request to convert.
Returns:
The created `PipelineRunSchema`.
"""
configuration = json.dumps(request.pipeline_configuration)
client_environment = json.dumps(request.client_environment)
orchestrator_environment = json.dumps(request.orchestrator_environment)
return cls(
id=request.id,
name=request.name,
orchestrator_run_id=request.orchestrator_run_id,
stack_id=request.stack,
project_id=request.project,
user_id=request.user,
pipeline_id=request.pipeline,
schedule_id=request.schedule_id,
enable_cache=request.enable_cache,
start_time=request.start_time,
status=request.status,
pipeline_configuration=configuration,
num_steps=request.num_steps,
git_sha=request.git_sha,
zenml_version=request.zenml_version,
client_environment=client_environment,
orchestrator_environment=orchestrator_environment,
)
def to_model(
self, _block_recursion: bool = False
) -> PipelineRunResponseModel:
"""Convert a `PipelineRunSchema` to a `PipelineRunResponseModel`.
Args:
_block_recursion: If other models should be recursively filled
Returns:
The created `PipelineRunResponseModel`.
"""
client_environment = (
json.loads(self.client_environment)
if self.client_environment
else {}
)
orchestrator_environment = (
json.loads(self.orchestrator_environment)
if self.orchestrator_environment
else {}
)
if _block_recursion:
return PipelineRunResponseModel(
id=self.id,
name=self.name,
project=self.project.to_model(),
user=self.user.to_model(True) if self.user else None,
schedule_id=self.schedule_id,
orchestrator_run_id=self.orchestrator_run_id,
enable_cache=self.enable_cache,
start_time=self.start_time,
end_time=self.end_time,
status=self.status,
pipeline_configuration=json.loads(self.pipeline_configuration),
num_steps=self.num_steps,
git_sha=self.git_sha,
zenml_version=self.zenml_version,
client_environment=client_environment,
orchestrator_environment=orchestrator_environment,
created=self.created,
updated=self.updated,
)
else:
return PipelineRunResponseModel(
id=self.id,
name=self.name,
stack=self.stack.to_model() if self.stack else None,
project=self.project.to_model(),
user=self.user.to_model(True) if self.user else None,
orchestrator_run_id=self.orchestrator_run_id,
enable_cache=self.enable_cache,
start_time=self.start_time,
end_time=self.end_time,
status=self.status,
pipeline=(
self.pipeline.to_model(False) if self.pipeline else None
),
schedule_id=self.schedule_id,
pipeline_configuration=json.loads(self.pipeline_configuration),
num_steps=self.num_steps,
git_sha=self.git_sha,
zenml_version=self.zenml_version,
client_environment=client_environment,
orchestrator_environment=orchestrator_environment,
created=self.created,
updated=self.updated,
)
def update(
self, run_update: "PipelineRunUpdateModel"
) -> "PipelineRunSchema":
"""Update a `PipelineRunSchema` with a `PipelineRunUpdateModel`.
Args:
run_update: The `PipelineRunUpdateModel` to update with.
Returns:
The updated `PipelineRunSchema`.
"""
if run_update.status:
self.status = run_update.status
self.end_time = run_update.end_time
self.updated = datetime.utcnow()
return self
from_request(request)
classmethod
Convert a PipelineRunRequestModel
to a PipelineRunSchema
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
PipelineRunRequestModel |
The request to convert. |
required |
Returns:
Type | Description |
---|---|
PipelineRunSchema |
The created |
Source code in zenml/zen_stores/schemas/pipeline_run_schemas.py
@classmethod
def from_request(
cls, request: PipelineRunRequestModel
) -> "PipelineRunSchema":
"""Convert a `PipelineRunRequestModel` to a `PipelineRunSchema`.
Args:
request: The request to convert.
Returns:
The created `PipelineRunSchema`.
"""
configuration = json.dumps(request.pipeline_configuration)
client_environment = json.dumps(request.client_environment)
orchestrator_environment = json.dumps(request.orchestrator_environment)
return cls(
id=request.id,
name=request.name,
orchestrator_run_id=request.orchestrator_run_id,
stack_id=request.stack,
project_id=request.project,
user_id=request.user,
pipeline_id=request.pipeline,
schedule_id=request.schedule_id,
enable_cache=request.enable_cache,
start_time=request.start_time,
status=request.status,
pipeline_configuration=configuration,
num_steps=request.num_steps,
git_sha=request.git_sha,
zenml_version=request.zenml_version,
client_environment=client_environment,
orchestrator_environment=orchestrator_environment,
)
to_model(self, _block_recursion=False)
Convert a PipelineRunSchema
to a PipelineRunResponseModel
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_block_recursion |
bool |
If other models should be recursively filled |
False |
Returns:
Type | Description |
---|---|
PipelineRunResponseModel |
The created |
Source code in zenml/zen_stores/schemas/pipeline_run_schemas.py
def to_model(
self, _block_recursion: bool = False
) -> PipelineRunResponseModel:
"""Convert a `PipelineRunSchema` to a `PipelineRunResponseModel`.
Args:
_block_recursion: If other models should be recursively filled
Returns:
The created `PipelineRunResponseModel`.
"""
client_environment = (
json.loads(self.client_environment)
if self.client_environment
else {}
)
orchestrator_environment = (
json.loads(self.orchestrator_environment)
if self.orchestrator_environment
else {}
)
if _block_recursion:
return PipelineRunResponseModel(
id=self.id,
name=self.name,
project=self.project.to_model(),
user=self.user.to_model(True) if self.user else None,
schedule_id=self.schedule_id,
orchestrator_run_id=self.orchestrator_run_id,
enable_cache=self.enable_cache,
start_time=self.start_time,
end_time=self.end_time,
status=self.status,
pipeline_configuration=json.loads(self.pipeline_configuration),
num_steps=self.num_steps,
git_sha=self.git_sha,
zenml_version=self.zenml_version,
client_environment=client_environment,
orchestrator_environment=orchestrator_environment,
created=self.created,
updated=self.updated,
)
else:
return PipelineRunResponseModel(
id=self.id,
name=self.name,
stack=self.stack.to_model() if self.stack else None,
project=self.project.to_model(),
user=self.user.to_model(True) if self.user else None,
orchestrator_run_id=self.orchestrator_run_id,
enable_cache=self.enable_cache,
start_time=self.start_time,
end_time=self.end_time,
status=self.status,
pipeline=(
self.pipeline.to_model(False) if self.pipeline else None
),
schedule_id=self.schedule_id,
pipeline_configuration=json.loads(self.pipeline_configuration),
num_steps=self.num_steps,
git_sha=self.git_sha,
zenml_version=self.zenml_version,
client_environment=client_environment,
orchestrator_environment=orchestrator_environment,
created=self.created,
updated=self.updated,
)
update(self, run_update)
Update a PipelineRunSchema
with a PipelineRunUpdateModel
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_update |
PipelineRunUpdateModel |
The |
required |
Returns:
Type | Description |
---|---|
PipelineRunSchema |
The updated |
Source code in zenml/zen_stores/schemas/pipeline_run_schemas.py
def update(
self, run_update: "PipelineRunUpdateModel"
) -> "PipelineRunSchema":
"""Update a `PipelineRunSchema` with a `PipelineRunUpdateModel`.
Args:
run_update: The `PipelineRunUpdateModel` to update with.
Returns:
The updated `PipelineRunSchema`.
"""
if run_update.status:
self.status = run_update.status
self.end_time = run_update.end_time
self.updated = datetime.utcnow()
return self
pipeline_schemas
SQL Model Implementations for Pipelines and Pipeline Runs.
PipelineSchema (NamedSchema)
pydantic-model
SQL Model for pipelines.
Source code in zenml/zen_stores/schemas/pipeline_schemas.py
class PipelineSchema(NamedSchema, table=True):
"""SQL Model for pipelines."""
__tablename__ = "pipeline"
docstring: Optional[str] = Field(sa_column=Column(TEXT, nullable=True))
spec: str = Field(sa_column=Column(TEXT, nullable=False))
project_id: UUID = build_foreign_key_field(
source=__tablename__,
target=ProjectSchema.__tablename__,
source_column="project_id",
target_column="id",
ondelete="CASCADE",
nullable=False,
)
project: "ProjectSchema" = Relationship(back_populates="pipelines")
user_id: Optional[UUID] = build_foreign_key_field(
source=__tablename__,
target=UserSchema.__tablename__,
source_column="user_id",
target_column="id",
ondelete="SET NULL",
nullable=True,
)
user: Optional["UserSchema"] = Relationship(back_populates="pipelines")
schedules: List["ScheduleSchema"] = Relationship(
back_populates="pipeline",
)
runs: List["PipelineRunSchema"] = Relationship(
back_populates="pipeline", sa_relationship_kwargs={"cascade": "delete"}
)
def to_model(
self,
_block_recursion: bool = False,
last_x_runs: int = 3,
) -> "PipelineResponseModel":
"""Convert a `PipelineSchema` to a `PipelineModel`.
Args:
_block_recursion: Don't recursively fill attributes
last_x_runs: How many runs to use for the execution status
Returns:
The created PipelineModel.
"""
x_runs = self.runs[:last_x_runs]
status_last_x_runs = []
for run in x_runs:
status_last_x_runs.append(run.status)
if _block_recursion:
return PipelineResponseModel(
id=self.id,
name=self.name,
project=self.project.to_model(),
user=self.user.to_model(True) if self.user else None,
docstring=self.docstring,
spec=PipelineSpec.parse_raw(self.spec),
created=self.created,
updated=self.updated,
)
else:
return PipelineResponseModel(
id=self.id,
name=self.name,
project=self.project.to_model(),
user=self.user.to_model(True) if self.user else None,
runs=[r.to_model(_block_recursion=True) for r in x_runs],
docstring=self.docstring,
spec=PipelineSpec.parse_raw(self.spec),
created=self.created,
updated=self.updated,
status=status_last_x_runs,
)
def update(
self, pipeline_update: "PipelineUpdateModel"
) -> "PipelineSchema":
"""Update a `PipelineSchema` with a `PipelineUpdateModel`.
Args:
pipeline_update: The update model.
Returns:
The updated `PipelineSchema`.
"""
if pipeline_update.name:
self.name = pipeline_update.name
if pipeline_update.docstring:
self.docstring = pipeline_update.docstring
if pipeline_update.spec:
self.spec = pipeline_update.spec.json(sort_keys=True)
self.updated = datetime.utcnow()
return self
to_model(self, _block_recursion=False, last_x_runs=3)
Convert a PipelineSchema
to a PipelineModel
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_block_recursion |
bool |
Don't recursively fill attributes |
False |
last_x_runs |
int |
How many runs to use for the execution status |
3 |
Returns:
Type | Description |
---|---|
PipelineResponseModel |
The created PipelineModel. |
Source code in zenml/zen_stores/schemas/pipeline_schemas.py
def to_model(
self,
_block_recursion: bool = False,
last_x_runs: int = 3,
) -> "PipelineResponseModel":
"""Convert a `PipelineSchema` to a `PipelineModel`.
Args:
_block_recursion: Don't recursively fill attributes
last_x_runs: How many runs to use for the execution status
Returns:
The created PipelineModel.
"""
x_runs = self.runs[:last_x_runs]
status_last_x_runs = []
for run in x_runs:
status_last_x_runs.append(run.status)
if _block_recursion:
return PipelineResponseModel(
id=self.id,
name=self.name,
project=self.project.to_model(),
user=self.user.to_model(True) if self.user else None,
docstring=self.docstring,
spec=PipelineSpec.parse_raw(self.spec),
created=self.created,
updated=self.updated,
)
else:
return PipelineResponseModel(
id=self.id,
name=self.name,
project=self.project.to_model(),
user=self.user.to_model(True) if self.user else None,
runs=[r.to_model(_block_recursion=True) for r in x_runs],
docstring=self.docstring,
spec=PipelineSpec.parse_raw(self.spec),
created=self.created,
updated=self.updated,
status=status_last_x_runs,
)
update(self, pipeline_update)
Update a PipelineSchema
with a PipelineUpdateModel
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_update |
PipelineUpdateModel |
The update model. |
required |
Returns:
Type | Description |
---|---|
PipelineSchema |
The updated |
Source code in zenml/zen_stores/schemas/pipeline_schemas.py
def update(
self, pipeline_update: "PipelineUpdateModel"
) -> "PipelineSchema":
"""Update a `PipelineSchema` with a `PipelineUpdateModel`.
Args:
pipeline_update: The update model.
Returns:
The updated `PipelineSchema`.
"""
if pipeline_update.name:
self.name = pipeline_update.name
if pipeline_update.docstring:
self.docstring = pipeline_update.docstring
if pipeline_update.spec:
self.spec = pipeline_update.spec.json(sort_keys=True)
self.updated = datetime.utcnow()
return self
project_schemas
SQL Model Implementations for Projects.
ProjectSchema (NamedSchema)
pydantic-model
SQL Model for projects.
Source code in zenml/zen_stores/schemas/project_schemas.py
class ProjectSchema(NamedSchema, table=True):
"""SQL Model for projects."""
__tablename__ = "workspace"
description: str
user_role_assignments: List["UserRoleAssignmentSchema"] = Relationship(
back_populates="project", sa_relationship_kwargs={"cascade": "delete"}
)
team_role_assignments: List["TeamRoleAssignmentSchema"] = Relationship(
back_populates="project",
sa_relationship_kwargs={"cascade": "all, delete"},
)
stacks: List["StackSchema"] = Relationship(
back_populates="project", sa_relationship_kwargs={"cascade": "delete"}
)
components: List["StackComponentSchema"] = Relationship(
back_populates="project", sa_relationship_kwargs={"cascade": "delete"}
)
flavors: List["FlavorSchema"] = Relationship(
back_populates="project", sa_relationship_kwargs={"cascade": "delete"}
)
pipelines: List["PipelineSchema"] = Relationship(
back_populates="project", sa_relationship_kwargs={"cascade": "delete"}
)
schedules: List["ScheduleSchema"] = Relationship(
back_populates="project", sa_relationship_kwargs={"cascade": "delete"}
)
runs: List["PipelineRunSchema"] = Relationship(
back_populates="project", sa_relationship_kwargs={"cascade": "delete"}
)
step_runs: List["StepRunSchema"] = Relationship(
back_populates="project", sa_relationship_kwargs={"cascade": "delete"}
)
artifacts: List["ArtifactSchema"] = Relationship(
back_populates="project", sa_relationship_kwargs={"cascade": "delete"}
)
@classmethod
def from_request(cls, project: ProjectRequestModel) -> "ProjectSchema":
"""Create a `ProjectSchema` from a `ProjectResponseModel`.
Args:
project: The `ProjectResponseModel` from which to create the schema.
Returns:
The created `ProjectSchema`.
"""
return cls(name=project.name, description=project.description)
def update(self, project_update: ProjectUpdateModel) -> "ProjectSchema":
"""Update a `ProjectSchema` from a `ProjectUpdateModel`.
Args:
project_update: The `ProjectUpdateModel` from which to update the
schema.
Returns:
The updated `ProjectSchema`.
"""
for field, value in project_update.dict(exclude_unset=True).items():
setattr(self, field, value)
self.updated = datetime.utcnow()
return self
def to_model(self) -> ProjectResponseModel:
"""Convert a `ProjectSchema` to a `ProjectResponseModel`.
Returns:
The converted `ProjectResponseModel`.
"""
return ProjectResponseModel(
id=self.id,
name=self.name,
description=self.description,
created=self.created,
updated=self.updated,
)
from_request(project)
classmethod
Create a ProjectSchema
from a ProjectResponseModel
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project |
ProjectRequestModel |
The |
required |
Returns:
Type | Description |
---|---|
ProjectSchema |
The created |
Source code in zenml/zen_stores/schemas/project_schemas.py
@classmethod
def from_request(cls, project: ProjectRequestModel) -> "ProjectSchema":
"""Create a `ProjectSchema` from a `ProjectResponseModel`.
Args:
project: The `ProjectResponseModel` from which to create the schema.
Returns:
The created `ProjectSchema`.
"""
return cls(name=project.name, description=project.description)
to_model(self)
Convert a ProjectSchema
to a ProjectResponseModel
.
Returns:
Type | Description |
---|---|
ProjectResponseModel |
The converted |
Source code in zenml/zen_stores/schemas/project_schemas.py
def to_model(self) -> ProjectResponseModel:
"""Convert a `ProjectSchema` to a `ProjectResponseModel`.
Returns:
The converted `ProjectResponseModel`.
"""
return ProjectResponseModel(
id=self.id,
name=self.name,
description=self.description,
created=self.created,
updated=self.updated,
)
update(self, project_update)
Update a ProjectSchema
from a ProjectUpdateModel
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_update |
ProjectUpdateModel |
The |
required |
Returns:
Type | Description |
---|---|
ProjectSchema |
The updated |
Source code in zenml/zen_stores/schemas/project_schemas.py
def update(self, project_update: ProjectUpdateModel) -> "ProjectSchema":
"""Update a `ProjectSchema` from a `ProjectUpdateModel`.
Args:
project_update: The `ProjectUpdateModel` from which to update the
schema.
Returns:
The updated `ProjectSchema`.
"""
for field, value in project_update.dict(exclude_unset=True).items():
setattr(self, field, value)
self.updated = datetime.utcnow()
return self
role_schemas
SQLModel implementation of roles that can be assigned to users or teams.
RolePermissionSchema (SQLModel)
pydantic-model
SQL Model for team assignments.
Source code in zenml/zen_stores/schemas/role_schemas.py
class RolePermissionSchema(SQLModel, table=True):
"""SQL Model for team assignments."""
__tablename__ = "role_permission"
name: PermissionType = Field(primary_key=True)
role_id: UUID = build_foreign_key_field(
source=__tablename__,
target=RoleSchema.__tablename__,
source_column="role_id",
target_column="id",
ondelete="CASCADE",
nullable=False,
primary_key=True,
)
roles: List["RoleSchema"] = Relationship(back_populates="permissions")
RoleSchema (NamedSchema)
pydantic-model
SQL Model for roles.
Source code in zenml/zen_stores/schemas/role_schemas.py
class RoleSchema(NamedSchema, table=True):
"""SQL Model for roles."""
__tablename__ = "role"
permissions: