Zen Stores
zenml.zen_stores
special
ZenStores define ways to store ZenML relevant data locally or remotely.
base_zen_store
Base Zen Store implementation.
BaseZenStore (BaseModel, ZenStoreInterface, AnalyticsTrackerMixin)
pydantic-model
Base class for accessing and persisting ZenML core objects.
Attributes:
Name | Type | Description |
---|---|---|
config |
StoreConfiguration |
The configuration of the store. |
track_analytics |
bool |
Only send analytics if set to |
Source code in zenml/zen_stores/base_zen_store.py
class BaseZenStore(BaseModel, ZenStoreInterface, AnalyticsTrackerMixin):
"""Base class for accessing and persisting ZenML core objects.
Attributes:
config: The configuration of the store.
track_analytics: Only send analytics if set to `True`.
"""
config: StoreConfiguration
track_analytics: bool = True
_active_user: Optional[UserModel] = None
TYPE: ClassVar[StoreType]
CONFIG_TYPE: ClassVar[Type[StoreConfiguration]]
# ---------------------------------
# Initialization and configuration
# ---------------------------------
def __init__(
self,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> None:
"""Create and initialize a store.
Args:
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the Pydantic
constructor.
Raises:
RuntimeError: If the store cannot be initialized.
"""
super().__init__(**kwargs)
try:
self._initialize()
except Exception as e:
raise RuntimeError(
f"Error initializing {self.type.value} store with URL "
f"'{self.url}': {str(e)}"
) from e
if not skip_default_registrations:
logger.debug("Initializing database")
self._initialize_database()
else:
logger.debug("Skipping database initialization")
@staticmethod
def get_store_class(store_type: StoreType) -> Type["BaseZenStore"]:
"""Returns the class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The class of the given store type or None if the type is unknown.
Raises:
TypeError: If the store type is unsupported.
"""
if store_type == StoreType.SQL:
from zenml.zen_stores.sql_zen_store import SqlZenStore
return SqlZenStore
elif store_type == StoreType.REST:
from zenml.zen_stores.rest_zen_store import RestZenStore
return RestZenStore
else:
raise TypeError(
f"No store implementation found for store type "
f"`{store_type.value}`."
)
@staticmethod
def get_store_config_class(
store_type: StoreType,
) -> Type["StoreConfiguration"]:
"""Returns the store config class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The config class of the given store type.
"""
store_class = BaseZenStore.get_store_class(store_type)
return store_class.CONFIG_TYPE
@staticmethod
def get_store_type(url: str) -> StoreType:
"""Returns the store type associated with a URL schema.
Args:
url: The store URL.
Returns:
The store type associated with the supplied URL schema.
Raises:
TypeError: If no store type was found to support the supplied URL.
"""
from zenml.zen_stores.rest_zen_store import RestZenStoreConfiguration
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
if SqlZenStoreConfiguration.supports_url_scheme(url):
return StoreType.SQL
elif RestZenStoreConfiguration.supports_url_scheme(url):
return StoreType.REST
else:
raise TypeError(f"No store implementation found for URL: {url}.")
@staticmethod
def create_store(
config: StoreConfiguration,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> "BaseZenStore":
"""Create and initialize a store from a store configuration.
Args:
config: The store configuration to use.
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the store class
Returns:
The initialized store.
"""
logger.debug(f"Creating store with config '{config}'...")
store_class = BaseZenStore.get_store_class(config.type)
store = store_class(
config=config,
skip_default_registrations=skip_default_registrations,
**kwargs,
)
return store
@staticmethod
def get_default_store_config(path: str) -> StoreConfiguration:
"""Get the default store configuration.
The default store is a SQLite store that saves the DB contents on the
local filesystem.
Args:
path: The local path where the store DB will be stored.
Returns:
The default store configuration.
"""
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
config = SqlZenStoreConfiguration(
type=StoreType.SQL, url=SqlZenStoreConfiguration.get_local_url(path)
)
return config
def _initialize_database(self) -> None:
"""Initialize the database on first use."""
try:
default_project = self._default_project
except KeyError:
default_project = self._create_default_project()
try:
assert self._admin_role
except KeyError:
self._create_admin_role()
try:
assert self._guest_role
except KeyError:
self._create_guest_role()
try:
default_user = self._default_user
except KeyError:
default_user = self._create_default_user()
try:
self._get_default_stack(
project_name_or_id=default_project.id,
user_name_or_id=default_user.id,
)
except KeyError:
self._create_default_stack(
project_name_or_id=default_project.id,
user_name_or_id=default_user.id,
)
@property
def url(self) -> str:
"""The URL of the store.
Returns:
The URL of the store.
"""
return self.config.url
@property
def type(self) -> StoreType:
"""The type of the store.
Returns:
The type of the store.
"""
return self.TYPE
def validate_active_config(
self,
active_project_name_or_id: Optional[Union[str, UUID]] = None,
active_stack_id: Optional[UUID] = None,
config_name: str = "",
) -> Tuple[ProjectModel, StackModel]:
"""Validate the active configuration.
Call this method to validate the supplied active project and active
stack values.
This method is guaranteed to return valid project ID and stack ID
values. If the supplied project and stack are not set or are not valid
(e.g. they do not exist or are not accessible), the default project and
default project stack will be returned in their stead.
Args:
active_project_name_or_id: The name or ID of the active project.
active_stack_id: The ID of the active stack.
config_name: The name of the configuration to validate (used in the
displayed logs/messages).
Returns:
A tuple containing the active project and active stack.
"""
active_project: ProjectModel
# Figure out a project to use if one isn't configured or
# available:
# 1. If the default project is configured, use that.
# 2. If the default project is not configured, use the first
# project in the store.
# 3. If there are no projects in the store, create the default
# project and use that
try:
default_project = self._default_project
except KeyError:
projects = self.list_projects()
if len(projects) == 0:
self._create_default_project()
default_project = self._default_project
else:
default_project = projects[0]
# Ensure that the current active project is still valid
if active_project_name_or_id:
try:
active_project = self.get_project(active_project_name_or_id)
except KeyError:
logger.warning(
"The current %s active project is no longer available. "
"Resetting the active project to '%s'.",
config_name,
default_project.name,
)
active_project = default_project
else:
logger.info(
"Setting the %s active project to '%s'.",
config_name,
default_project.name,
)
active_project = default_project
active_stack: StackModel
# Create a default stack in the active project for the active user if
# one is not yet created.
try:
default_stack = self._get_default_stack(
project_name_or_id=active_project.id,
user_name_or_id=self.active_user.id,
)
except KeyError:
default_stack = self._create_default_stack(
project_name_or_id=active_project.id,
user_name_or_id=self.active_user.id,
)
# Sanitize the active stack
if active_stack_id:
# Ensure that the active stack is still valid
try:
active_stack = self.get_stack(stack_id=active_stack_id)
except KeyError:
logger.warning(
"The current %s active stack is no longer available. "
"Resetting the active stack to default.",
config_name,
)
active_stack = default_stack
else:
if active_stack.project != active_project.id:
logger.warning(
"The current %s active stack is not part of the active "
"project. Resetting the active stack to default.",
config_name,
)
active_stack = default_stack
elif (
not active_stack.is_shared
and active_stack.user != self.active_user.id
):
logger.warning(
"The current %s active stack is not shared and not "
"owned by the active user. "
"Resetting the active stack to default.",
config_name,
)
active_stack = default_stack
else:
logger.warning(
"Setting the %s active stack to default.",
config_name,
)
active_stack = default_stack
return active_project, active_stack
def get_store_info(self) -> ServerModel:
"""Get information about the store.
Returns:
Information about the store.
"""
return ServerModel(
id=GlobalConfiguration().user_id,
version=zenml.__version__,
deployment_type=os.environ.get(
ENV_ZENML_SERVER_DEPLOYMENT_TYPE, ServerDeploymentType.OTHER
),
database_type=ServerDatabaseType.OTHER,
)
def is_local_store(self) -> bool:
"""Check if the store is a local store or connected to a locally deployed ZenML server.
Returns:
True if the store is local, False otherwise.
"""
return self.get_store_info().is_local()
# ------
# Stacks
# ------
@track(AnalyticsEvent.REGISTERED_DEFAULT_STACK)
def _create_default_stack(
self,
project_name_or_id: Union[str, UUID],
user_name_or_id: Union[str, UUID],
) -> StackModel:
"""Create the default stack components and stack.
The default stack contains a local orchestrator and a local artifact
store.
Args:
project_name_or_id: Name or ID of the project to which the stack
belongs.
user_name_or_id: The name or ID of the user that owns the stack.
Returns:
The model of the created default stack.
Raises:
StackExistsError: If a default stack already exists for the
user in the supplied project.
"""
project = self.get_project(project_name_or_id=project_name_or_id)
user = self.get_user(user_name_or_id=user_name_or_id)
try:
self._get_default_stack(
project_name_or_id=project_name_or_id,
user_name_or_id=user_name_or_id,
)
except KeyError:
pass
else:
raise StackExistsError(
f"Default stack already exists for user "
f"{user.name} in project {project.name}"
)
logger.info(
f"Creating default stack for user '{user.name}' in project "
f"{project.name}..."
)
# Register the default orchestrator
orchestrator = self.create_stack_component(
component=ComponentModel(
user=user.id,
project=project.id,
name=DEFAULT_STACK_COMPONENT_NAME,
type=StackComponentType.ORCHESTRATOR,
flavor="local",
configuration={},
),
)
# Register the default artifact store
artifact_store = self.create_stack_component(
component=ComponentModel(
user=user.id,
project=project.id,
name=DEFAULT_STACK_COMPONENT_NAME,
type=StackComponentType.ARTIFACT_STORE,
flavor="local",
configuration={},
),
)
components = {c.type: [c.id] for c in [orchestrator, artifact_store]}
# Register the default stack
stack = StackModel(
name=DEFAULT_STACK_NAME,
components=components,
is_shared=False,
project=project.id,
user=user.id,
)
return self.create_stack(stack=stack)
def _get_default_stack(
self,
project_name_or_id: Union[str, UUID],
user_name_or_id: Union[str, UUID],
) -> StackModel:
"""Get the default stack for a user in a project.
Args:
project_name_or_id: Name or ID of the project.
user_name_or_id: Name or ID of the user.
Returns:
The default stack in the project owned by the supplied user.
Raises:
KeyError: if the project or default stack doesn't exist.
"""
default_stacks = self.list_stacks(
project_name_or_id=project_name_or_id,
user_name_or_id=user_name_or_id,
name=DEFAULT_STACK_NAME,
)
if len(default_stacks) == 0:
raise KeyError(
f"No default stack found for user {str(user_name_or_id)} in "
f"project {str(project_name_or_id)}"
)
return default_stacks[0]
# -----
# Roles
# -----
@property
def _admin_role(self) -> RoleModel:
"""Get the admin role.
Returns:
The default admin role.
"""
return self.get_role(ADMIN_ROLE)
@track(AnalyticsEvent.CREATED_DEFAULT_ROLES)
def _create_admin_role(self) -> RoleModel:
"""Creates the admin role.
Returns:
The admin role
"""
logger.info(f"Creating '{ADMIN_ROLE}' role ...")
return self.create_role(
RoleModel(
name=ADMIN_ROLE,
permissions=[
PermissionType.READ.value,
PermissionType.WRITE.value,
PermissionType.ME.value,
],
)
)
@property
def _guest_role(self) -> RoleModel:
"""Get the guest role.
Returns:
The guest role.
"""
return self.get_role(GUEST_ROLE)
@track(AnalyticsEvent.CREATED_DEFAULT_ROLES)
def _create_guest_role(self) -> RoleModel:
"""Creates the guest role.
Returns:
The guest role
"""
logger.info(f"Creating '{GUEST_ROLE}' role ...")
return self.create_role(
RoleModel(
name=GUEST_ROLE,
permissions=[
PermissionType.READ.value,
PermissionType.ME.value,
],
)
)
# -----
# Users
# -----
@property
def active_user(self) -> UserModel:
"""The active user.
Returns:
The active user.
"""
if self._active_user is None:
self._active_user = self.get_user(self.active_user_name)
return self._active_user
@property
def users(self) -> List[UserModel]:
"""All existing users.
Returns:
A list of all existing users.
"""
return self.list_users()
@property
def _default_user_name(self) -> str:
"""Get the default user name.
Returns:
The default user name.
"""
return os.getenv(ENV_ZENML_DEFAULT_USER_NAME, DEFAULT_USERNAME)
@property
def _default_user(self) -> UserModel:
"""Get the default user.
Returns:
The default user.
Raises:
KeyError: If the default user doesn't exist.
"""
user_name = self._default_user_name
try:
return self.get_user(user_name)
except KeyError:
raise KeyError(f"The default user '{user_name}' is not configured")
@track(AnalyticsEvent.CREATED_DEFAULT_USER)
def _create_default_user(self) -> UserModel:
"""Creates a default user with the admin role.
Returns:
The default user.
"""
user_name = os.getenv(ENV_ZENML_DEFAULT_USER_NAME, DEFAULT_USERNAME)
user_password = os.getenv(
ENV_ZENML_DEFAULT_USER_PASSWORD, DEFAULT_PASSWORD
)
logger.info(f"Creating default user '{user_name}' ...")
new_user = self.create_user(
UserModel(
name=user_name,
active=True,
password=user_password,
)
)
self.assign_role(
role_name_or_id=self._admin_role.id,
user_or_team_name_or_id=new_user.id,
is_user=True,
)
return new_user
# -----
# Teams
# -----
@property
def teams(self) -> List[TeamModel]:
"""List all teams.
Returns:
A list of all teams.
"""
return self.list_teams()
# -----
# Roles
# -----
@property
def roles(self) -> List[RoleModel]:
"""All existing roles.
Returns:
A list of all existing roles.
"""
return self.list_roles()
@property
def role_assignments(self) -> List[RoleAssignmentModel]:
"""All role assignments.
Returns:
A list of all role assignments.
"""
return self.list_role_assignments(user_name_or_id=self.active_user_name)
# --------
# Projects
# --------
@property
def _default_project_name(self) -> str:
"""Get the default project name.
Returns:
The default project name.
"""
return os.getenv(ENV_ZENML_DEFAULT_PROJECT_NAME, DEFAULT_PROJECT_NAME)
@property
def _default_project(self) -> ProjectModel:
"""Get the default project.
Returns:
The default project.
Raises:
KeyError: if the default project doesn't exist.
"""
project_name = self._default_project_name
try:
return self.get_project(project_name)
except KeyError:
raise KeyError(
f"The default project '{project_name}' is not configured"
)
@track(AnalyticsEvent.CREATED_DEFAULT_PROJECT)
def _create_default_project(self) -> ProjectModel:
"""Creates a default project.
Returns:
The default project.
"""
project_name = self._default_project_name
logger.info(f"Creating default project '{project_name}' ...")
return self.create_project(ProjectModel(name=project_name))
# ------------
# Repositories
# ------------
# ---------
# Pipelines
# ---------
def get_pipeline_in_project(
self, pipeline_name: str, project_name_or_id: Union[str, UUID]
) -> PipelineModel:
"""Get a pipeline with a given name in a project.
Args:
pipeline_name: Name of the pipeline.
project_name_or_id: ID of the project.
Returns:
The pipeline.
Raises:
KeyError: if the pipeline does not exist.
"""
pipelines = self.list_pipelines(
project_name_or_id=project_name_or_id, name=pipeline_name
)
if len(pipelines) == 0:
raise KeyError(
f"No pipeline found with name {pipeline_name} in project "
f"{project_name_or_id}"
)
return pipelines[0]
# -------------
# Pipeline runs
# -------------
# ------------------
# Pipeline run steps
# ------------------
# ---------
# Analytics
# ---------
def track_event(
self,
event: Union[str, AnalyticsEvent],
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Track an analytics event.
Args:
event: The event to track.
metadata: Additional metadata to track with the event.
"""
if self.track_analytics:
# Server information is always tracked, if available.
track_event(event, metadata)
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Ignore extra attributes from configs of previous ZenML versions
extra = "ignore"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
active_user: UserModel
property
readonly
The active user.
Returns:
Type | Description |
---|---|
UserModel |
The active user. |
role_assignments: List[zenml.models.user_management_models.RoleAssignmentModel]
property
readonly
All role assignments.
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.RoleAssignmentModel] |
A list of all role assignments. |
roles: List[zenml.models.user_management_models.RoleModel]
property
readonly
All existing roles.
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.RoleModel] |
A list of all existing roles. |
teams: List[zenml.models.user_management_models.TeamModel]
property
readonly
List all teams.
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.TeamModel] |
A list of all teams. |
type: StoreType
property
readonly
The type of the store.
Returns:
Type | Description |
---|---|
StoreType |
The type of the store. |
url: str
property
readonly
The URL of the store.
Returns:
Type | Description |
---|---|
str |
The URL of the store. |
users: List[zenml.models.user_management_models.UserModel]
property
readonly
All existing users.
Returns:
Type | Description |
---|---|
List[zenml.models.user_management_models.UserModel] |
A list of all existing users. |
Config
Pydantic configuration class.
Source code in zenml/zen_stores/base_zen_store.py
class Config:
"""Pydantic configuration class."""
# Validate attributes when assigning them. We need to set this in order
# to have a mix of mutable and immutable attributes
validate_assignment = True
# Ignore extra attributes from configs of previous ZenML versions
extra = "ignore"
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
__init__(self, skip_default_registrations=False, **kwargs)
special
Create and initialize a store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
skip_default_registrations |
bool |
If |
False |
**kwargs |
Any |
Additional keyword arguments to pass to the Pydantic constructor. |
{} |
Exceptions:
Type | Description |
---|---|
RuntimeError |
If the store cannot be initialized. |
Source code in zenml/zen_stores/base_zen_store.py
def __init__(
self,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> None:
"""Create and initialize a store.
Args:
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the Pydantic
constructor.
Raises:
RuntimeError: If the store cannot be initialized.
"""
super().__init__(**kwargs)
try:
self._initialize()
except Exception as e:
raise RuntimeError(
f"Error initializing {self.type.value} store with URL "
f"'{self.url}': {str(e)}"
) from e
if not skip_default_registrations:
logger.debug("Initializing database")
self._initialize_database()
else:
logger.debug("Skipping database initialization")
create_store(config, skip_default_registrations=False, **kwargs)
staticmethod
Create and initialize a store from a store configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
StoreConfiguration |
The store configuration to use. |
required |
skip_default_registrations |
bool |
If |
False |
**kwargs |
Any |
Additional keyword arguments to pass to the store class |
{} |
Returns:
Type | Description |
---|---|
BaseZenStore |
The initialized store. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def create_store(
config: StoreConfiguration,
skip_default_registrations: bool = False,
**kwargs: Any,
) -> "BaseZenStore":
"""Create and initialize a store from a store configuration.
Args:
config: The store configuration to use.
skip_default_registrations: If `True`, the creation of the default
stack and user in the store will be skipped.
**kwargs: Additional keyword arguments to pass to the store class
Returns:
The initialized store.
"""
logger.debug(f"Creating store with config '{config}'...")
store_class = BaseZenStore.get_store_class(config.type)
store = store_class(
config=config,
skip_default_registrations=skip_default_registrations,
**kwargs,
)
return store
get_default_store_config(path)
staticmethod
Get the default store configuration.
The default store is a SQLite store that saves the DB contents on the local filesystem.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The local path where the store DB will be stored. |
required |
Returns:
Type | Description |
---|---|
StoreConfiguration |
The default store configuration. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_default_store_config(path: str) -> StoreConfiguration:
"""Get the default store configuration.
The default store is a SQLite store that saves the DB contents on the
local filesystem.
Args:
path: The local path where the store DB will be stored.
Returns:
The default store configuration.
"""
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
config = SqlZenStoreConfiguration(
type=StoreType.SQL, url=SqlZenStoreConfiguration.get_local_url(path)
)
return config
get_pipeline_in_project(self, pipeline_name, project_name_or_id)
Get a pipeline with a given name in a project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_name |
str |
Name of the pipeline. |
required |
project_name_or_id |
Union[str, uuid.UUID] |
ID of the project. |
required |
Returns:
Type | Description |
---|---|
PipelineModel |
The pipeline. |
Exceptions:
Type | Description |
---|---|
KeyError |
if the pipeline does not exist. |
Source code in zenml/zen_stores/base_zen_store.py
def get_pipeline_in_project(
self, pipeline_name: str, project_name_or_id: Union[str, UUID]
) -> PipelineModel:
"""Get a pipeline with a given name in a project.
Args:
pipeline_name: Name of the pipeline.
project_name_or_id: ID of the project.
Returns:
The pipeline.
Raises:
KeyError: if the pipeline does not exist.
"""
pipelines = self.list_pipelines(
project_name_or_id=project_name_or_id, name=pipeline_name
)
if len(pipelines) == 0:
raise KeyError(
f"No pipeline found with name {pipeline_name} in project "
f"{project_name_or_id}"
)
return pipelines[0]
get_store_class(store_type)
staticmethod
Returns the class of the given store type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store_type |
StoreType |
The type of the store to get the class for. |
required |
Returns:
Type | Description |
---|---|
Type[BaseZenStore] |
The class of the given store type or None if the type is unknown. |
Exceptions:
Type | Description |
---|---|
TypeError |
If the store type is unsupported. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_class(store_type: StoreType) -> Type["BaseZenStore"]:
"""Returns the class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The class of the given store type or None if the type is unknown.
Raises:
TypeError: If the store type is unsupported.
"""
if store_type == StoreType.SQL:
from zenml.zen_stores.sql_zen_store import SqlZenStore
return SqlZenStore
elif store_type == StoreType.REST:
from zenml.zen_stores.rest_zen_store import RestZenStore
return RestZenStore
else:
raise TypeError(
f"No store implementation found for store type "
f"`{store_type.value}`."
)
get_store_config_class(store_type)
staticmethod
Returns the store config class of the given store type.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
store_type |
StoreType |
The type of the store to get the class for. |
required |
Returns:
Type | Description |
---|---|
Type[StoreConfiguration] |
The config class of the given store type. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_config_class(
store_type: StoreType,
) -> Type["StoreConfiguration"]:
"""Returns the store config class of the given store type.
Args:
store_type: The type of the store to get the class for.
Returns:
The config class of the given store type.
"""
store_class = BaseZenStore.get_store_class(store_type)
return store_class.CONFIG_TYPE
get_store_info(self)
Get information about the store.
Returns:
Type | Description |
---|---|
ServerModel |
Information about the store. |
Source code in zenml/zen_stores/base_zen_store.py
def get_store_info(self) -> ServerModel:
"""Get information about the store.
Returns:
Information about the store.
"""
return ServerModel(
id=GlobalConfiguration().user_id,
version=zenml.__version__,
deployment_type=os.environ.get(
ENV_ZENML_SERVER_DEPLOYMENT_TYPE, ServerDeploymentType.OTHER
),
database_type=ServerDatabaseType.OTHER,
)
get_store_type(url)
staticmethod
Returns the store type associated with a URL schema.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The store URL. |
required |
Returns:
Type | Description |
---|---|
StoreType |
The store type associated with the supplied URL schema. |
Exceptions:
Type | Description |
---|---|
TypeError |
If no store type was found to support the supplied URL. |
Source code in zenml/zen_stores/base_zen_store.py
@staticmethod
def get_store_type(url: str) -> StoreType:
"""Returns the store type associated with a URL schema.
Args:
url: The store URL.
Returns:
The store type associated with the supplied URL schema.
Raises:
TypeError: If no store type was found to support the supplied URL.
"""
from zenml.zen_stores.rest_zen_store import RestZenStoreConfiguration
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
if SqlZenStoreConfiguration.supports_url_scheme(url):
return StoreType.SQL
elif RestZenStoreConfiguration.supports_url_scheme(url):
return StoreType.REST
else:
raise TypeError(f"No store implementation found for URL: {url}.")
is_local_store(self)
Check if the store is a local store or connected to a locally deployed ZenML server.
Returns:
Type | Description |
---|---|
bool |
True if the store is local, False otherwise. |
Source code in zenml/zen_stores/base_zen_store.py
def is_local_store(self) -> bool:
"""Check if the store is a local store or connected to a locally deployed ZenML server.
Returns:
True if the store is local, False otherwise.
"""
return self.get_store_info().is_local()
track_event(self, event, metadata=None)
Track an analytics event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event |
Union[str, zenml.utils.analytics_utils.AnalyticsEvent] |
The event to track. |
required |
metadata |
Optional[Dict[str, Any]] |
Additional metadata to track with the event. |
None |
Source code in zenml/zen_stores/base_zen_store.py
def track_event(
self,
event: Union[str, AnalyticsEvent],
metadata: Optional[Dict[str, Any]] = None,
) -> None:
"""Track an analytics event.
Args:
event: The event to track.
metadata: Additional metadata to track with the event.
"""
if self.track_analytics:
# Server information is always tracked, if available.
track_event(event, metadata)
validate_active_config(self, active_project_name_or_id=None, active_stack_id=None, config_name='')
Validate the active configuration.
Call this method to validate the supplied active project and active stack values.
This method is guaranteed to return valid project ID and stack ID values. If the supplied project and stack are not set or are not valid (e.g. they do not exist or are not accessible), the default project and default project stack will be returned in their stead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
active_project_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the active project. |
None |
active_stack_id |
Optional[uuid.UUID] |
The ID of the active stack. |
None |
config_name |
str |
The name of the configuration to validate (used in the displayed logs/messages). |
'' |
Returns:
Type | Description |
---|---|
Tuple[zenml.models.project_models.ProjectModel, zenml.models.stack_models.StackModel] |
A tuple containing the active project and active stack. |
Source code in zenml/zen_stores/base_zen_store.py
def validate_active_config(
self,
active_project_name_or_id: Optional[Union[str, UUID]] = None,
active_stack_id: Optional[UUID] = None,
config_name: str = "",
) -> Tuple[ProjectModel, StackModel]:
"""Validate the active configuration.
Call this method to validate the supplied active project and active
stack values.
This method is guaranteed to return valid project ID and stack ID
values. If the supplied project and stack are not set or are not valid
(e.g. they do not exist or are not accessible), the default project and
default project stack will be returned in their stead.
Args:
active_project_name_or_id: The name or ID of the active project.
active_stack_id: The ID of the active stack.
config_name: The name of the configuration to validate (used in the
displayed logs/messages).
Returns:
A tuple containing the active project and active stack.
"""
active_project: ProjectModel
# Figure out a project to use if one isn't configured or
# available:
# 1. If the default project is configured, use that.
# 2. If the default project is not configured, use the first
# project in the store.
# 3. If there are no projects in the store, create the default
# project and use that
try:
default_project = self._default_project
except KeyError:
projects = self.list_projects()
if len(projects) == 0:
self._create_default_project()
default_project = self._default_project
else:
default_project = projects[0]
# Ensure that the current active project is still valid
if active_project_name_or_id:
try:
active_project = self.get_project(active_project_name_or_id)
except KeyError:
logger.warning(
"The current %s active project is no longer available. "
"Resetting the active project to '%s'.",
config_name,
default_project.name,
)
active_project = default_project
else:
logger.info(
"Setting the %s active project to '%s'.",
config_name,
default_project.name,
)
active_project = default_project
active_stack: StackModel
# Create a default stack in the active project for the active user if
# one is not yet created.
try:
default_stack = self._get_default_stack(
project_name_or_id=active_project.id,
user_name_or_id=self.active_user.id,
)
except KeyError:
default_stack = self._create_default_stack(
project_name_or_id=active_project.id,
user_name_or_id=self.active_user.id,
)
# Sanitize the active stack
if active_stack_id:
# Ensure that the active stack is still valid
try:
active_stack = self.get_stack(stack_id=active_stack_id)
except KeyError:
logger.warning(
"The current %s active stack is no longer available. "
"Resetting the active stack to default.",
config_name,
)
active_stack = default_stack
else:
if active_stack.project != active_project.id:
logger.warning(
"The current %s active stack is not part of the active "
"project. Resetting the active stack to default.",
config_name,
)
active_stack = default_stack
elif (
not active_stack.is_shared
and active_stack.user != self.active_user.id
):
logger.warning(
"The current %s active stack is not shared and not "
"owned by the active user. "
"Resetting the active stack to default.",
config_name,
)
active_stack = default_stack
else:
logger.warning(
"Setting the %s active stack to default.",
config_name,
)
active_stack = default_stack
return active_project, active_stack
metadata_store
Base implementation of a metadata store.
MLMDArtifactModel (BaseModel)
pydantic-model
Class that models an artifact response from the metadata store.
Source code in zenml/zen_stores/metadata_store.py
class MLMDArtifactModel(BaseModel):
"""Class that models an artifact response from the metadata store."""
mlmd_id: int
type: ArtifactType
uri: str
materializer: str
data_type: str
mlmd_parent_step_id: int
mlmd_producer_step_id: int
is_cached: bool
MLMDPipelineRunModel (BaseModel)
pydantic-model
Class that models a pipeline run response from the metadata store.
Source code in zenml/zen_stores/metadata_store.py
class MLMDPipelineRunModel(BaseModel):
"""Class that models a pipeline run response from the metadata store."""
mlmd_id: int
name: str
project: Optional[UUID]
user: Optional[UUID]
pipeline_id: Optional[UUID]
stack_id: Optional[UUID]
pipeline_configuration: Dict[str, Any]
num_steps: Optional[int]
MLMDStepRunModel (BaseModel)
pydantic-model
Class that models a step run response from the metadata store.
Source code in zenml/zen_stores/metadata_store.py
class MLMDStepRunModel(BaseModel):
"""Class that models a step run response from the metadata store."""
mlmd_id: int
mlmd_parent_step_ids: List[int]
entrypoint_name: str
name: str
parameters: Dict[str, str]
step_configuration: Dict[str, Any]
docstring: Optional[str]
num_outputs: Optional[int]
MetadataStore
ZenML MLMD metadata store.
Source code in zenml/zen_stores/metadata_store.py
class MetadataStore:
"""ZenML MLMD metadata store."""
upgrade_migration_enabled: bool = True
store: metadata_store.MetadataStore
def __init__(self, config: metadata_store_pb2.ConnectionConfig) -> None:
"""Initializes the metadata store.
Args:
config: The connection configuration for the metadata store.
"""
self.store = metadata_store.MetadataStore(
config, enable_upgrade_migration=True
)
@property
def step_type_mapping(self) -> Dict[int, str]:
"""Maps type_ids to step names.
Returns:
Dict[int, str]: a mapping from type_ids to step names.
"""
return {
type_.id: type_.name for type_ in self.store.get_execution_types()
}
def _check_if_executions_belong_to_pipeline(
self,
executions: List[proto.Execution],
pipeline_id: int,
) -> bool:
"""Returns `True` if the executions are associated with the pipeline context.
Args:
executions: List of executions.
pipeline_id: The ID of the pipeline to check.
Returns:
`True` if the executions are associated with the pipeline context.
"""
for execution in executions:
associated_contexts = self.store.get_contexts_by_execution(
execution.id
)
for context in associated_contexts:
if context.id == pipeline_id: # noqa
return True
return False
def _get_zenml_execution_context_properties(
self, execution: proto.Execution
) -> Any:
associated_contexts = self.store.get_contexts_by_execution(execution.id)
for context in associated_contexts:
context_type = self.store.get_context_types_by_id(
[context.type_id]
)[0].name
if context_type == ZENML_CONTEXT_TYPE_NAME:
return context.custom_properties
raise RuntimeError(
f"Could not find 'zenml' context for execution {execution.name}."
)
def _get_step_model_from_execution(
self, execution: proto.Execution
) -> MLMDStepRunModel:
"""Get the original step from an execution.
Args:
execution: proto.Execution object from mlmd store.
Returns:
Model of the original step derived from the proto.Execution.
Raises:
KeyError: If the execution is not associated with a step.
"""
impl_name = self.step_type_mapping[execution.type_id].split(".")[-1]
step_name_property = execution.custom_properties.get(
INTERNAL_EXECUTION_PARAMETER_PREFIX + PARAM_PIPELINE_PARAMETER_NAME,
None,
)
if step_name_property:
step_name = json.loads(step_name_property.string_value)
else:
raise KeyError(
f"Step name missing for execution with ID {execution.id}. "
f"This error probably occurs because you're using ZenML "
f"version 0.5.4 or newer but your metadata store contains "
f"data from previous versions."
)
step_parameters = {}
for k, v in execution.custom_properties.items():
if not k.startswith(INTERNAL_EXECUTION_PARAMETER_PREFIX):
try:
json.loads(v.string_value)
step_parameters[k] = v.string_value
except JSONDecodeError:
# this means there is a property in there that is neither
# an internal one or one created by zenml. Therefore, we can
# ignore it
pass
step_context_properties = self._get_zenml_execution_context_properties(
execution=execution,
)
if MLMD_CONTEXT_STEP_CONFIG_PROPERTY_NAME in step_context_properties:
step_configuration = json.loads(
step_context_properties.get(
MLMD_CONTEXT_STEP_CONFIG_PROPERTY_NAME
).string_value
)
else:
step_configuration = {}
# Extract docstring.
docstring = None
if "config" in step_configuration:
step_configuration_config = step_configuration["config"]
if "docstring" in step_configuration_config:
docstring = step_configuration_config["docstring"]
# Get number of outputs.
if MLMD_CONTEXT_NUM_OUTPUTS_PROPERTY_NAME in step_context_properties:
num_outputs = int(
step_context_properties.get(
MLMD_CONTEXT_NUM_OUTPUTS_PROPERTY_NAME
).string_value
)
else:
num_outputs = None
# TODO [ENG-222]: This is a lot of querying to the metadata store. We
# should refactor and make it nicer. Probably it makes more sense
# to first get `executions_ids_for_current_run` and then filter on
# `event.execution_id in execution_ids_for_current_run`.
# Core logic here is that we get the event of this particular execution
# id that gives us the artifacts of this execution. We then go through
# all `input` artifacts of this execution and get all events related to
# that artifact. This in turn gives us other events for which this
# artifact was an `output` artifact. Then we simply need to sort by
# time to get the most recent execution (i.e. step) that produced that
# particular artifact.
events_for_execution = self.store.get_events_by_execution_ids(
[execution.id]
)
parents_step_ids = set()
for current_event in events_for_execution:
if current_event.type == current_event.INPUT:
# this means the artifact is an input artifact
events_for_input_artifact = [
e
for e in self.store.get_events_by_artifact_ids(
[current_event.artifact_id]
)
# should be output type and should NOT be the same id as
# the execution we are querying and it should be BEFORE
# the time of the current event.
if e.type == e.OUTPUT
and e.execution_id != current_event.execution_id
and e.milliseconds_since_epoch
< current_event.milliseconds_since_epoch
]
# sort by time
events_for_input_artifact.sort(
key=lambda x: x.milliseconds_since_epoch # type: ignore[no-any-return] # noqa
)
# take the latest one and add execution to the parents.
parents_step_ids.add(events_for_input_artifact[-1].execution_id)
return MLMDStepRunModel(
mlmd_id=execution.id,
mlmd_parent_step_ids=list(parents_step_ids),
entrypoint_name=impl_name,
name=step_name,
parameters=step_parameters or {},
step_configuration=step_configuration or {},
docstring=docstring,
num_outputs=num_outputs,
)
def _get_pipeline_run_model_from_context(
self, context: proto.Context
) -> MLMDPipelineRunModel:
project, user, pipeline_id, stack_id = None, None, None, None
pipeline_configuration = {}
num_steps = None
executions = self.store.get_executions_by_context(context_id=context.id)
if len(executions) > 0:
context_properties = self._get_zenml_execution_context_properties(
executions[-1]
)
if MLMD_CONTEXT_MODEL_IDS_PROPERTY_NAME in context_properties:
model_ids = json.loads(
context_properties.get(
MLMD_CONTEXT_MODEL_IDS_PROPERTY_NAME
).string_value
)
project = model_ids.get("project_id")
user = model_ids.get("user_id")
pipeline_id = model_ids.get("pipeline_id")
stack_id = model_ids.get("stack_id")
if MLMD_CONTEXT_PIPELINE_CONFIG_PROPERTY_NAME in context_properties:
pipeline_configuration = json.loads(
context_properties.get(
MLMD_CONTEXT_PIPELINE_CONFIG_PROPERTY_NAME
).string_value
)
if MLMD_CONTEXT_NUM_STEPS_PROPERTY_NAME in context_properties:
num_steps = int(
context_properties.get(
MLMD_CONTEXT_NUM_STEPS_PROPERTY_NAME
).string_value
)
return MLMDPipelineRunModel(
mlmd_id=context.id,
name=context.name,
project=project,
user=user,
pipeline_id=pipeline_id,
stack_id=stack_id,
pipeline_configuration=pipeline_configuration or {},
num_steps=num_steps,
)
def get_all_runs(
self, ignored_ids: Optional[List[int]] = None
) -> List[MLMDPipelineRunModel]:
"""Gets a mapping run name -> ID for all runs registered in MLMD.
Args:
ignored_ids: A list of run IDs to ignore.
Returns:
A mapping run name -> ID for all runs registered in MLMD.
"""
run_contexts = self.store.get_contexts_by_type(
PIPELINE_RUN_CONTEXT_TYPE_NAME
)
return [
self._get_pipeline_run_model_from_context(run_context)
for run_context in run_contexts
if not ignored_ids or run_context.id not in ignored_ids
]
def get_pipeline_run_steps(
self, run_id: int
) -> Dict[str, MLMDStepRunModel]:
"""Gets all steps for the given pipeline run.
Args:
run_id: The ID of the pipeline run to get the steps for.
Returns:
A dictionary of step names to step views.
"""
steps: Dict[str, MLMDStepRunModel] = OrderedDict()
# reverse the executions as they get returned in reverse chronological
# order from the metadata store
executions = self.store.get_executions_by_context(run_id)
for execution in reversed(executions): # noqa
step = self._get_step_model_from_execution(execution)
steps[step.name] = step
logger.debug(f"Fetched {len(steps)} steps for pipeline run '{run_id}'.")
return steps
def get_step_by_id(self, step_id: int) -> MLMDStepRunModel:
"""Gets a step by its ID.
Args:
step_id: The ID of the step to get.
Returns:
A model of the step with the given ID.
"""
execution = self.store.get_executions_by_id([step_id])[0]
return self._get_step_model_from_execution(execution)
def get_step_status(self, step_id: int) -> ExecutionStatus:
"""Gets the execution status of a single step.
Args:
step_id: The ID of the step to get the status for.
Returns:
ExecutionStatus: The status of the step.
"""
proto = self.store.get_executions_by_id([step_id])[0] # noqa
state = proto.last_known_state
if state == proto.COMPLETE:
return ExecutionStatus.COMPLETED
elif state == proto.RUNNING:
return ExecutionStatus.RUNNING
elif state == proto.CACHED:
return ExecutionStatus.CACHED
else:
return ExecutionStatus.FAILED
def _get_artifact_model_from_proto(
self, artifact_proto: Artifact, parent_step_id: int
) -> MLMDArtifactModel:
"""Gets a model of an artifact from its proto.
Args:
artifact_proto: The proto of the artifact to get the model for.
parent_step_id: The ID of the parent step.
Returns:
A model of the artifact.
"""
# maps artifact types to their string representation
artifact_type_mapping = {
type_.id: type_.name for type_ in self.store.get_artifact_types()
}
artifact_type = artifact_type_mapping[artifact_proto.type_id]
materializer = artifact_proto.properties[
MATERIALIZER_PROPERTY_KEY
].string_value
data_type = artifact_proto.properties[
DATATYPE_PROPERTY_KEY
].string_value
artifact_id = artifact_proto.id
producer_step = self.get_producer_step_from_artifact(artifact_id)
producer_step_id = producer_step.mlmd_id
artifact = MLMDArtifactModel(
mlmd_id=artifact_id,
type=artifact_type,
uri=artifact_proto.uri,
materializer=materializer,
data_type=data_type,
mlmd_parent_step_id=parent_step_id,
mlmd_producer_step_id=producer_step_id,
is_cached=parent_step_id != producer_step_id,
)
return artifact
def get_step_input_artifacts(
self,
step_id: int,
step_parent_step_ids: List[int],
) -> Dict[str, MLMDArtifactModel]:
"""Returns input artifacts for the given step.
Args:
step_id: The ID of the step to get the input artifacts for.
step_parent_step_ids: The IDs of the parent steps of the given step.
Returns:
A dict mapping input names to input artifacts.
"""
events = self.store.get_events_by_execution_ids([step_id]) # noqa
events = [event for event in events if event.type == event.INPUT]
input_artifact_ids = [event.artifact_id for event in events]
artifacts = self.store.get_artifacts_by_id(input_artifact_ids)
events.sort(key=lambda x: x.artifact_id)
artifacts.sort(key=lambda x: x.id)
inputs: Dict[str, MLMDArtifactModel] = {}
for event_proto, artifact_proto in zip(events, artifacts):
assert event_proto.artifact_id == artifact_proto.id
artifact_name = event_proto.path.steps[0].key
# In the case that this is an input event, we actually need
# to resolve the parent step ID via its parents outputs.
parent_step_id = None
for parent_id in step_parent_step_ids:
self.get_step_by_id(parent_id)
parent_outputs = self.get_step_output_artifacts(
step_id=parent_id,
)
for parent_output in parent_outputs.values():
if artifact_proto.id == parent_output.mlmd_id:
parent_step_id = parent_id
assert parent_step_id is not None
artifact = self._get_artifact_model_from_proto(
artifact_proto, parent_step_id=parent_step_id
)
inputs[artifact_name] = artifact
logger.debug("Fetched %d inputs for step '%d'.", len(inputs), step_id)
return inputs
def get_step_output_artifacts(
self, step_id: int
) -> Dict[str, MLMDArtifactModel]:
"""Returns the output artifacts for the given step.
Args:
step_id: The ID of the step to get the output artifacts for.
Returns:
A dict mapping output names to output artifacts.
"""
events = self.store.get_events_by_execution_ids([step_id]) # noqa
events = [event for event in events if event.type == event.OUTPUT]
output_artifact_ids = [event.artifact_id for event in events]
artifacts = self.store.get_artifacts_by_id(output_artifact_ids)
events.sort(key=lambda x: x.artifact_id)
artifacts.sort(key=lambda x: x.id)
outputs: Dict[str, MLMDArtifactModel] = {}
for event_proto, artifact_proto in zip(events, artifacts):
assert event_proto.artifact_id == artifact_proto.id
artifact_name = event_proto.path.steps[0].key
artifact = self._get_artifact_model_from_proto(
artifact_proto, parent_step_id=step_id
)
outputs[artifact_name] = artifact
logger.debug("Fetched %d outputs for step '%d'.", len(outputs), step_id)
return outputs
def get_producer_step_from_artifact(
self, artifact_id: int
) -> MLMDStepRunModel:
"""Find the original step that created an artifact.
Args:
artifact_id: ID of the artifact for which to get the producer step.
Returns:
Original step that produced the artifact.
"""
executions_ids = set(
event.execution_id
for event in self.store.get_events_by_artifact_ids([artifact_id])
if event.type == event.OUTPUT
)
execution = self.store.get_executions_by_id(executions_ids)[0]
return self._get_step_model_from_execution(execution)
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
# prevent extra attributes during model initialization
extra = Extra.forbid
step_type_mapping: Dict[int, str]
property
readonly
Maps type_ids to step names.
Returns:
Type | Description |
---|---|
Dict[int, str] |
a mapping from type_ids to step names. |
Config
Pydantic configuration class.
Source code in zenml/zen_stores/metadata_store.py
class Config:
"""Pydantic configuration class."""
# public attributes are immutable
allow_mutation = False
# all attributes with leading underscore are private and therefore
# are mutable and not included in serialization
underscore_attrs_are_private = True
# prevent extra attributes during model initialization
extra = Extra.forbid
__init__(self, config)
special
Initializes the metadata store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
ConnectionConfig |
The connection configuration for the metadata store. |
required |
Source code in zenml/zen_stores/metadata_store.py
def __init__(self, config: metadata_store_pb2.ConnectionConfig) -> None:
"""Initializes the metadata store.
Args:
config: The connection configuration for the metadata store.
"""
self.store = metadata_store.MetadataStore(
config, enable_upgrade_migration=True
)
get_all_runs(self, ignored_ids=None)
Gets a mapping run name -> ID for all runs registered in MLMD.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
ignored_ids |
Optional[List[int]] |
A list of run IDs to ignore. |
None |
Returns:
Type | Description |
---|---|
List[zenml.zen_stores.metadata_store.MLMDPipelineRunModel] |
A mapping run name -> ID for all runs registered in MLMD. |
Source code in zenml/zen_stores/metadata_store.py
def get_all_runs(
self, ignored_ids: Optional[List[int]] = None
) -> List[MLMDPipelineRunModel]:
"""Gets a mapping run name -> ID for all runs registered in MLMD.
Args:
ignored_ids: A list of run IDs to ignore.
Returns:
A mapping run name -> ID for all runs registered in MLMD.
"""
run_contexts = self.store.get_contexts_by_type(
PIPELINE_RUN_CONTEXT_TYPE_NAME
)
return [
self._get_pipeline_run_model_from_context(run_context)
for run_context in run_contexts
if not ignored_ids or run_context.id not in ignored_ids
]
get_pipeline_run_steps(self, run_id)
Gets all steps for the given pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
int |
The ID of the pipeline run to get the steps for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.zen_stores.metadata_store.MLMDStepRunModel] |
A dictionary of step names to step views. |
Source code in zenml/zen_stores/metadata_store.py
def get_pipeline_run_steps(
self, run_id: int
) -> Dict[str, MLMDStepRunModel]:
"""Gets all steps for the given pipeline run.
Args:
run_id: The ID of the pipeline run to get the steps for.
Returns:
A dictionary of step names to step views.
"""
steps: Dict[str, MLMDStepRunModel] = OrderedDict()
# reverse the executions as they get returned in reverse chronological
# order from the metadata store
executions = self.store.get_executions_by_context(run_id)
for execution in reversed(executions): # noqa
step = self._get_step_model_from_execution(execution)
steps[step.name] = step
logger.debug(f"Fetched {len(steps)} steps for pipeline run '{run_id}'.")
return steps
get_producer_step_from_artifact(self, artifact_id)
Find the original step that created an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_id |
int |
ID of the artifact for which to get the producer step. |
required |
Returns:
Type | Description |
---|---|
MLMDStepRunModel |
Original step that produced the artifact. |
Source code in zenml/zen_stores/metadata_store.py
def get_producer_step_from_artifact(
self, artifact_id: int
) -> MLMDStepRunModel:
"""Find the original step that created an artifact.
Args:
artifact_id: ID of the artifact for which to get the producer step.
Returns:
Original step that produced the artifact.
"""
executions_ids = set(
event.execution_id
for event in self.store.get_events_by_artifact_ids([artifact_id])
if event.type == event.OUTPUT
)
execution = self.store.get_executions_by_id(executions_ids)[0]
return self._get_step_model_from_execution(execution)
get_step_by_id(self, step_id)
Gets a step by its ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
int |
The ID of the step to get. |
required |
Returns:
Type | Description |
---|---|
MLMDStepRunModel |
A model of the step with the given ID. |
Source code in zenml/zen_stores/metadata_store.py
def get_step_by_id(self, step_id: int) -> MLMDStepRunModel:
"""Gets a step by its ID.
Args:
step_id: The ID of the step to get.
Returns:
A model of the step with the given ID.
"""
execution = self.store.get_executions_by_id([step_id])[0]
return self._get_step_model_from_execution(execution)
get_step_input_artifacts(self, step_id, step_parent_step_ids)
Returns input artifacts for the given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
int |
The ID of the step to get the input artifacts for. |
required |
step_parent_step_ids |
List[int] |
The IDs of the parent steps of the given step. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.zen_stores.metadata_store.MLMDArtifactModel] |
A dict mapping input names to input artifacts. |
Source code in zenml/zen_stores/metadata_store.py
def get_step_input_artifacts(
self,
step_id: int,
step_parent_step_ids: List[int],
) -> Dict[str, MLMDArtifactModel]:
"""Returns input artifacts for the given step.
Args:
step_id: The ID of the step to get the input artifacts for.
step_parent_step_ids: The IDs of the parent steps of the given step.
Returns:
A dict mapping input names to input artifacts.
"""
events = self.store.get_events_by_execution_ids([step_id]) # noqa
events = [event for event in events if event.type == event.INPUT]
input_artifact_ids = [event.artifact_id for event in events]
artifacts = self.store.get_artifacts_by_id(input_artifact_ids)
events.sort(key=lambda x: x.artifact_id)
artifacts.sort(key=lambda x: x.id)
inputs: Dict[str, MLMDArtifactModel] = {}
for event_proto, artifact_proto in zip(events, artifacts):
assert event_proto.artifact_id == artifact_proto.id
artifact_name = event_proto.path.steps[0].key
# In the case that this is an input event, we actually need
# to resolve the parent step ID via its parents outputs.
parent_step_id = None
for parent_id in step_parent_step_ids:
self.get_step_by_id(parent_id)
parent_outputs = self.get_step_output_artifacts(
step_id=parent_id,
)
for parent_output in parent_outputs.values():
if artifact_proto.id == parent_output.mlmd_id:
parent_step_id = parent_id
assert parent_step_id is not None
artifact = self._get_artifact_model_from_proto(
artifact_proto, parent_step_id=parent_step_id
)
inputs[artifact_name] = artifact
logger.debug("Fetched %d inputs for step '%d'.", len(inputs), step_id)
return inputs
get_step_output_artifacts(self, step_id)
Returns the output artifacts for the given step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
int |
The ID of the step to get the output artifacts for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.zen_stores.metadata_store.MLMDArtifactModel] |
A dict mapping output names to output artifacts. |
Source code in zenml/zen_stores/metadata_store.py
def get_step_output_artifacts(
self, step_id: int
) -> Dict[str, MLMDArtifactModel]:
"""Returns the output artifacts for the given step.
Args:
step_id: The ID of the step to get the output artifacts for.
Returns:
A dict mapping output names to output artifacts.
"""
events = self.store.get_events_by_execution_ids([step_id]) # noqa
events = [event for event in events if event.type == event.OUTPUT]
output_artifact_ids = [event.artifact_id for event in events]
artifacts = self.store.get_artifacts_by_id(output_artifact_ids)
events.sort(key=lambda x: x.artifact_id)
artifacts.sort(key=lambda x: x.id)
outputs: Dict[str, MLMDArtifactModel] = {}
for event_proto, artifact_proto in zip(events, artifacts):
assert event_proto.artifact_id == artifact_proto.id
artifact_name = event_proto.path.steps[0].key
artifact = self._get_artifact_model_from_proto(
artifact_proto, parent_step_id=step_id
)
outputs[artifact_name] = artifact
logger.debug("Fetched %d outputs for step '%d'.", len(outputs), step_id)
return outputs
get_step_status(self, step_id)
Gets the execution status of a single step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
int |
The ID of the step to get the status for. |
required |
Returns:
Type | Description |
---|---|
ExecutionStatus |
The status of the step. |
Source code in zenml/zen_stores/metadata_store.py
def get_step_status(self, step_id: int) -> ExecutionStatus:
"""Gets the execution status of a single step.
Args:
step_id: The ID of the step to get the status for.
Returns:
ExecutionStatus: The status of the step.
"""
proto = self.store.get_executions_by_id([step_id])[0] # noqa
state = proto.last_known_state
if state == proto.COMPLETE:
return ExecutionStatus.COMPLETED
elif state == proto.RUNNING:
return ExecutionStatus.RUNNING
elif state == proto.CACHED:
return ExecutionStatus.CACHED
else:
return ExecutionStatus.FAILED
migrations
special
Alembic database migration utilities.
alembic
Alembic utilities wrapper.
The Alembic class defined here acts as a wrapper around the Alembic library that automatically configures Alembic to use the ZenML SQL store database connection.
Alembic
Alembic environment and migration API.
This class provides a wrapper around the Alembic library that automatically configures Alembic to use the ZenML SQL store database connection.
Source code in zenml/zen_stores/migrations/alembic.py
class Alembic:
"""Alembic environment and migration API.
This class provides a wrapper around the Alembic library that automatically
configures Alembic to use the ZenML SQL store database connection.
"""
def __init__(
self,
engine: Engine,
metadata: MetaData = SQLModel.metadata,
context: Optional[EnvironmentContext] = None,
**kwargs: Any,
) -> None:
"""Initialize the Alembic wrapper.
Args:
engine: The SQLAlchemy engine to use.
metadata: The SQLAlchemy metadata to use.
context: The Alembic environment context to use. If not set, a new
context is created pointing to the ZenML migrations directory.
**kwargs: Additional keyword arguments to pass to the Alembic
environment context.
"""
self.engine = engine
self.metadata = metadata
self.context_kwargs = kwargs
self.config = Config()
self.config.set_main_option(
"script_location", str(Path(__file__).parent)
)
self.config.set_main_option(
"version_locations", str(Path(__file__).parent / "versions")
)
self.script_directory = ScriptDirectory.from_config(self.config)
if context is None:
self.environment_context = EnvironmentContext(
self.config, self.script_directory
)
else:
self.environment_context = context
def db_is_empty(self) -> bool:
"""Check if the database is empty.
Returns:
True if the database is empty, False otherwise.
"""
# Check the existence of any of the SQLModel tables
return not self.engine.dialect.has_table(
self.engine.connect(), schemas.StackSchema.__tablename__
)
def run_migrations(
self,
fn: Optional[Callable[[_RevIdType, MigrationContext], List[Any]]],
) -> None:
"""Run an online migration function in the current migration context.
Args:
fn: Migration function to run. If not set, the function configured
externally by the Alembic CLI command is used.
"""
fn_context_args: Dict[Any, Any] = {}
if fn is not None:
fn_context_args["fn"] = fn
with self.engine.connect() as connection:
self.environment_context.configure(
connection=connection,
target_metadata=self.metadata,
include_object=include_object,
compare_type=True,
render_as_batch=True,
**fn_context_args,
**self.context_kwargs,
)
with self.environment_context.begin_transaction():
self.environment_context.run_migrations()
def current_revisions(self) -> List[str]:
"""Get the current database revisions.
Returns:
List of head revisions.
"""
current_revisions: List[str] = []
def do_get_current_rev(rev: _RevIdType, context: Any) -> List[Any]:
nonlocal current_revisions
for r in self.script_directory.get_all_current(
rev # type:ignore [arg-type]
):
if r is None:
continue
current_revisions.append(r.revision)
return []
self.run_migrations(do_get_current_rev)
return current_revisions
def stamp(self, revision: str) -> None:
"""Stamp the revision table with the given revision without running any migrations.
Args:
revision: String revision target.
"""
def do_stamp(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._stamp_revs(revision, rev)
self.run_migrations(do_stamp)
def upgrade(self, revision: str = "heads") -> None:
"""Upgrade the database to a later version.
Args:
revision: String revision target.
"""
def do_upgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._upgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_upgrade)
def downgrade(self, revision: str) -> None:
"""Revert the database to a previous version.
Args:
revision: String revision target.
"""
def do_downgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._downgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_downgrade)
__init__(self, engine, metadata=MetaData(), context=None, **kwargs)
special
Initialize the Alembic wrapper.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
engine |
Engine |
The SQLAlchemy engine to use. |
required |
metadata |
MetaData |
The SQLAlchemy metadata to use. |
MetaData() |
context |
Optional[alembic.runtime.environment.EnvironmentContext] |
The Alembic environment context to use. If not set, a new context is created pointing to the ZenML migrations directory. |
None |
**kwargs |
Any |
Additional keyword arguments to pass to the Alembic environment context. |
{} |
Source code in zenml/zen_stores/migrations/alembic.py
def __init__(
self,
engine: Engine,
metadata: MetaData = SQLModel.metadata,
context: Optional[EnvironmentContext] = None,
**kwargs: Any,
) -> None:
"""Initialize the Alembic wrapper.
Args:
engine: The SQLAlchemy engine to use.
metadata: The SQLAlchemy metadata to use.
context: The Alembic environment context to use. If not set, a new
context is created pointing to the ZenML migrations directory.
**kwargs: Additional keyword arguments to pass to the Alembic
environment context.
"""
self.engine = engine
self.metadata = metadata
self.context_kwargs = kwargs
self.config = Config()
self.config.set_main_option(
"script_location", str(Path(__file__).parent)
)
self.config.set_main_option(
"version_locations", str(Path(__file__).parent / "versions")
)
self.script_directory = ScriptDirectory.from_config(self.config)
if context is None:
self.environment_context = EnvironmentContext(
self.config, self.script_directory
)
else:
self.environment_context = context
current_revisions(self)
Get the current database revisions.
Returns:
Type | Description |
---|---|
List[str] |
List of head revisions. |
Source code in zenml/zen_stores/migrations/alembic.py
def current_revisions(self) -> List[str]:
"""Get the current database revisions.
Returns:
List of head revisions.
"""
current_revisions: List[str] = []
def do_get_current_rev(rev: _RevIdType, context: Any) -> List[Any]:
nonlocal current_revisions
for r in self.script_directory.get_all_current(
rev # type:ignore [arg-type]
):
if r is None:
continue
current_revisions.append(r.revision)
return []
self.run_migrations(do_get_current_rev)
return current_revisions
db_is_empty(self)
Check if the database is empty.
Returns:
Type | Description |
---|---|
bool |
True if the database is empty, False otherwise. |
Source code in zenml/zen_stores/migrations/alembic.py
def db_is_empty(self) -> bool:
"""Check if the database is empty.
Returns:
True if the database is empty, False otherwise.
"""
# Check the existence of any of the SQLModel tables
return not self.engine.dialect.has_table(
self.engine.connect(), schemas.StackSchema.__tablename__
)
downgrade(self, revision)
Revert the database to a previous version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
revision |
str |
String revision target. |
required |
Source code in zenml/zen_stores/migrations/alembic.py
def downgrade(self, revision: str) -> None:
"""Revert the database to a previous version.
Args:
revision: String revision target.
"""
def do_downgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._downgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_downgrade)
run_migrations(self, fn)
Run an online migration function in the current migration context.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
fn |
Optional[Callable[[Union[str, Sequence[str]], alembic.runtime.migration.MigrationContext], List[Any]]] |
Migration function to run. If not set, the function configured externally by the Alembic CLI command is used. |
required |
Source code in zenml/zen_stores/migrations/alembic.py
def run_migrations(
self,
fn: Optional[Callable[[_RevIdType, MigrationContext], List[Any]]],
) -> None:
"""Run an online migration function in the current migration context.
Args:
fn: Migration function to run. If not set, the function configured
externally by the Alembic CLI command is used.
"""
fn_context_args: Dict[Any, Any] = {}
if fn is not None:
fn_context_args["fn"] = fn
with self.engine.connect() as connection:
self.environment_context.configure(
connection=connection,
target_metadata=self.metadata,
include_object=include_object,
compare_type=True,
render_as_batch=True,
**fn_context_args,
**self.context_kwargs,
)
with self.environment_context.begin_transaction():
self.environment_context.run_migrations()
stamp(self, revision)
Stamp the revision table with the given revision without running any migrations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
revision |
str |
String revision target. |
required |
Source code in zenml/zen_stores/migrations/alembic.py
def stamp(self, revision: str) -> None:
"""Stamp the revision table with the given revision without running any migrations.
Args:
revision: String revision target.
"""
def do_stamp(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._stamp_revs(revision, rev)
self.run_migrations(do_stamp)
upgrade(self, revision='heads')
Upgrade the database to a later version.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
revision |
str |
String revision target. |
'heads' |
Source code in zenml/zen_stores/migrations/alembic.py
def upgrade(self, revision: str = "heads") -> None:
"""Upgrade the database to a later version.
Args:
revision: String revision target.
"""
def do_upgrade(rev: _RevIdType, context: Any) -> List[Any]:
return self.script_directory._upgrade_revs(
revision, rev # type:ignore [arg-type]
)
self.run_migrations(do_upgrade)
AlembicVersion (Base)
Alembic version table.
Source code in zenml/zen_stores/migrations/alembic.py
class AlembicVersion(Base): # type: ignore[valid-type,misc]
"""Alembic version table."""
__tablename__ = "alembic_version"
version_num = Column(String, nullable=False, primary_key=True)
include_object(object, name, type_, *args, **kwargs)
Function used to exclude tables from the migration scripts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
object |
Any |
The schema item object to check. |
required |
name |
str |
The name of the object to check. |
required |
type_ |
str |
The type of the object to check. |
required |
*args |
Any |
Additional arguments. |
() |
**kwargs |
Any |
Additional keyword arguments. |
{} |
Returns:
Type | Description |
---|---|
bool |
True if the object should be included, False otherwise. |
Source code in zenml/zen_stores/migrations/alembic.py
def include_object(
object: Any, name: str, type_: str, *args: Any, **kwargs: Any
) -> bool:
"""Function used to exclude tables from the migration scripts.
Args:
object: The schema item object to check.
name: The name of the object to check.
type_: The type of the object to check.
*args: Additional arguments.
**kwargs: Additional keyword arguments.
Returns:
True if the object should be included, False otherwise.
"""
return not (type_ == "table" and name in exclude_tables)
versions
special
Alembic database migration scripts.
5330ba58bf20_rename_tables_and_foreign_keys
Rename tables and foreign keys [5330ba58bf20].
Revision ID: 5330ba58bf20 Revises: 7280c14811d6 Create Date: 2022-11-03 16:33:15.220179
downgrade()
Downgrade database schema and/or data back to the previous revision.
Source code in zenml/zen_stores/migrations/versions/5330ba58bf20_rename_tables_and_foreign_keys.py
def downgrade() -> None:
"""Downgrade database schema and/or data back to the previous revision."""
(
old_table_names,
new_table_names,
project_not_null_tables,
old_fk_constraints,
new_fk_constraints,
) = _get_changes()
# Drop new foreign key constraints
for source, target, source_column, _, _ in new_fk_constraints:
constraint_name = _fk_constraint_name(source, target, source_column)
_drop_fk_constraint(source, constraint_name)
# Remove `project_id NOT NULL` where appropriate.
for table_name in project_not_null_tables:
with op.batch_alter_table(table_name, schema=None) as batch_op:
batch_op.alter_column(
"project_id", existing_type=sa.CHAR(length=32), nullable=True
)
# Rename tables
for old_table_name, new_table_name in zip(old_table_names, new_table_names):
op.rename_table(new_table_name, old_table_name)
# Create old foreign key constraints
for (
source,
target,
source_column,
target_column,
ondelete,
) in old_fk_constraints:
_create_fk_constraint(
source, target, source_column, target_column, ondelete
)
upgrade()
Upgrade database schema and/or data, creating a new revision.
Exceptions:
Type | Description |
---|---|
NotImplementedError |
If the database engine is not SQLite or MySQL. |
Source code in zenml/zen_stores/migrations/versions/5330ba58bf20_rename_tables_and_foreign_keys.py
def upgrade() -> None:
"""Upgrade database schema and/or data, creating a new revision.
Raises:
NotImplementedError: If the database engine is not SQLite or MySQL.
"""
(
old_table_names,
new_table_names,
project_not_null_tables,
old_fk_constraints,
new_fk_constraints,
) = _get_changes()
engine_name = op.get_bind().engine.name
# Under MySQL, we need to sort the old foreign keys by source table and
# source column first since the default foreign key names contain the
# foreign key number.
if engine_name == "mysql":
old_fk_constraints.sort(key=lambda x: (x[0], x[2]))
source_table_fk_constraint_counts: Dict[str, int] = defaultdict(int)
# Drop old foreign key constraints.
for source, target, source_column, _, _ in old_fk_constraints:
if engine_name == "sqlite":
constraint_name = _fk_constraint_name(source, target, source_column)
elif engine_name == "mysql":
source_table_fk_constraint_counts[source] += 1
fk_num = source_table_fk_constraint_counts[source]
constraint_name = f"{source}_ibfk_{fk_num}"
else:
raise NotImplementedError(f"Unsupported engine: {engine_name}")
_drop_fk_constraint(source, constraint_name)
# Rename tables
for old_table_name, new_table_name in zip(old_table_names, new_table_names):
op.rename_table(old_table_name, new_table_name)
# Set `project_id` to `NOT NULL` where appropriate.
for table_name in project_not_null_tables:
with op.batch_alter_table(table_name, schema=None) as batch_op:
batch_op.alter_column(
"project_id", existing_type=sa.CHAR(length=32), nullable=False
)
# Create new foreign key constraints
for (
source,
target,
source_column,
target_column,
ondelete,
) in new_fk_constraints:
_create_fk_constraint(
source, target, source_column, target_column, ondelete
)
5994f9ad0489_introduce_role_permissions
Introduce role permissions [5994f9ad0489].
Revision ID: 5994f9ad0489 Revises: 0.21.1 Create Date: 2022-10-25 23:52:25.935344
downgrade()
Downgrade database schema and/or data back to the previous revision.
Source code in zenml/zen_stores/migrations/versions/5994f9ad0489_introduce_role_permissions.py
def downgrade() -> None:
"""Downgrade database schema and/or data back to the previous revision."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("rolepermissionschema")
# ### end Alembic commands ###
upgrade()
Upgrade database schema and/or data, creating a new revision.
Source code in zenml/zen_stores/migrations/versions/5994f9ad0489_introduce_role_permissions.py
def upgrade() -> None:
"""Upgrade database schema and/or data, creating a new revision."""
# Create the rolepermissionschema table to track which permissions a given
# role grants
op.create_table(
"rolepermissionschema",
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("role_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.ForeignKeyConstraint(
["role_id"],
["roleschema.id"],
),
sa.PrimaryKeyConstraint("name", "role_id"),
)
# get metadata from current connection
meta = sa.MetaData(bind=op.get_bind())
# pass in tuple with tables we want to reflect, otherwise whole database
# will get reflected
meta.reflect(
only=(
"rolepermissionschema",
"roleschema",
"userroleassignmentschema",
"teamroleassignmentschema",
"userschema",
)
)
# In order to ensure unique names on roles delete potential admin/guest role
# that might have been created previous to this alembic version
userroleassignmentschema = sa.Table(
"userroleassignmentschema",
meta,
)
teamroleassignmentschema = sa.Table(
"teamroleassignmentschema",
meta,
)
roleschema = sa.Table(
"roleschema",
meta,
)
conn = op.get_bind()
res = conn.execute(
select(roleschema.c.id).where(roleschema.c.name.in_(["admin", "guest"]))
).fetchall()
role_ids = [i[0] for i in res]
conn.execute(
userroleassignmentschema.delete().where(
userroleassignmentschema.c.role_id.in_(role_ids)
)
)
conn.execute(
teamroleassignmentschema.delete().where(
teamroleassignmentschema.c.role_id.in_(role_ids)
)
)
conn.execute(
roleschema.delete().where(
or_(roleschema.c.name == "admin", roleschema.c.name == "guest")
)
)
# Create the three standard permissions also defined in
# zenml.enums.PermissionType
read = "read"
write = "write"
me = "me"
admin_id = str(uuid.uuid4()).replace("-", "")
guest_id = str(uuid.uuid4()).replace("-", "")
# Prefill the roles table with the admin and guest role
op.bulk_insert(
sa.Table(
"roleschema",
meta,
),
[
{
"id": admin_id,
"name": "admin",
"created": datetime.datetime.now(),
"updated": datetime.datetime.now(),
},
{
"id": guest_id,
"name": "guest",
"created": datetime.datetime.now(),
"updated": datetime.datetime.now(),
},
],
)
# Give the admin read, write and me permissions,
# give the guest read and me permissions
op.bulk_insert(
sa.Table(
"rolepermissionschema",
meta,
),
[
{"role_id": admin_id, "name": read},
{"role_id": admin_id, "name": write},
{"role_id": admin_id, "name": me},
{"role_id": guest_id, "name": read},
{"role_id": guest_id, "name": me},
],
)
# In order to not break permissions for existing users, all existing users
# will be assigned the admin role
userschema = sa.Table(
"userschema",
meta,
)
conn = op.get_bind()
res = conn.execute(select(userschema.c.id)).fetchall()
user_ids = [i[0] for i in res]
for user_id in user_ids:
op.bulk_insert(
sa.Table(
"userroleassignmentschema",
meta,
),
[
{
"id": str(uuid.uuid4()).replace("-", ""),
"role_id": admin_id,
"user_id": user_id,
"created": datetime.datetime.now(),
"updated": datetime.datetime.now(),
}
],
)
7280c14811d6_use_text_type
Use Text type [7280c14811d6].
Revision ID: 7280c14811d6 Revises: 5994f9ad0489 Create Date: 2022-11-09 16:29:37.025589
downgrade()
Downgrade database schema and/or data back to the previous revision.
Source code in zenml/zen_stores/migrations/versions/7280c14811d6_use_text_type.py
def downgrade() -> None:
"""Downgrade database schema and/or data back to the previous revision."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("steprunschema", schema=None) as batch_op:
batch_op.alter_column(
"docstring",
existing_type=sa.TEXT(),
type_=sa.VARCHAR(length=4096),
existing_nullable=True,
)
batch_op.alter_column(
"step_configuration",
existing_type=sa.TEXT(),
type_=sa.VARCHAR(length=4096),
existing_nullable=False,
)
batch_op.alter_column(
"parameters",
existing_type=sa.TEXT(),
type_=sa.VARCHAR(length=4096),
existing_nullable=False,
)
with op.batch_alter_table("pipelineschema", schema=None) as batch_op:
batch_op.alter_column(
"spec",
existing_type=sa.TEXT(),
type_=sa.VARCHAR(length=4096),
existing_nullable=False,
)
batch_op.alter_column(
"docstring",
existing_type=sa.TEXT(),
type_=sa.VARCHAR(length=4096),
existing_nullable=True,
)
with op.batch_alter_table("pipelinerunschema", schema=None) as batch_op:
batch_op.alter_column(
"pipeline_configuration",
existing_type=sa.TEXT(),
type_=sa.VARCHAR(length=4096),
existing_nullable=False,
)
# ### end Alembic commands ###
upgrade()
Upgrade database schema and/or data, creating a new revision.
Source code in zenml/zen_stores/migrations/versions/7280c14811d6_use_text_type.py
def upgrade() -> None:
"""Upgrade database schema and/or data, creating a new revision."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("pipelinerunschema", schema=None) as batch_op:
batch_op.alter_column(
"pipeline_configuration",
existing_type=sa.VARCHAR(length=4096),
type_=sa.TEXT(),
existing_nullable=False,
)
with op.batch_alter_table("pipelineschema", schema=None) as batch_op:
batch_op.alter_column(
"docstring",
existing_type=sa.VARCHAR(length=4096),
type_=sa.TEXT(),
existing_nullable=True,
)
batch_op.alter_column(
"spec",
existing_type=sa.VARCHAR(length=4096),
type_=sa.TEXT(),
existing_nullable=False,
)
with op.batch_alter_table("steprunschema", schema=None) as batch_op:
batch_op.alter_column(
"parameters",
existing_type=sa.VARCHAR(length=4096),
type_=sa.TEXT(),
existing_nullable=False,
)
batch_op.alter_column(
"step_configuration",
existing_type=sa.VARCHAR(length=4096),
type_=sa.TEXT(),
existing_nullable=False,
)
batch_op.alter_column(
"docstring",
existing_type=sa.VARCHAR(length=4096),
type_=sa.TEXT(),
existing_nullable=True,
)
# ### end Alembic commands ###
8a64fbfecda0_add_num_outputs_to_run_step
Add num_outputs to run step [8a64fbfecda0].
Revision ID: 8a64fbfecda0 Revises: 5330ba58bf20 Create Date: 2022-11-08 16:20:35.241562
downgrade()
Downgrade database schema and/or data back to the previous revision.
Source code in zenml/zen_stores/migrations/versions/8a64fbfecda0_add_num_outputs_to_run_step.py
def downgrade() -> None:
"""Downgrade database schema and/or data back to the previous revision."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("step_run", schema=None) as batch_op:
batch_op.drop_column("num_outputs")
with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
batch_op.alter_column(
"num_steps", existing_type=sa.Integer(), nullable=False
)
# ### end Alembic commands ###
upgrade()
Upgrade database schema and/or data, creating a new revision.
Source code in zenml/zen_stores/migrations/versions/8a64fbfecda0_add_num_outputs_to_run_step.py
def upgrade() -> None:
"""Upgrade database schema and/or data, creating a new revision."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("pipeline_run", schema=None) as batch_op:
batch_op.alter_column(
"num_steps", existing_type=sa.Integer(), nullable=True
)
with op.batch_alter_table("step_run", schema=None) as batch_op:
batch_op.add_column(
sa.Column("num_outputs", sa.Integer(), nullable=True)
)
# ### end Alembic commands ###
alembic_start
Initialize db with first revision.
Revision ID: alembic_start Revises: Create Date: 2022-10-19 11:17:54.753102
downgrade()
Downgrade database schema and/or data back to the previous revision.
Source code in zenml/zen_stores/migrations/versions/alembic_start.py
def downgrade() -> None:
"""Downgrade database schema and/or data back to the previous revision."""
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("stepinputartifactschema")
op.drop_table("steprunorderschema")
op.drop_table("artifactschema")
op.drop_table("steprunschema")
op.drop_table("stackcompositionschema")
op.drop_table("pipelinerunschema")
op.drop_table("userroleassignmentschema")
op.drop_table("teamroleassignmentschema")
op.drop_table("teamassignmentschema")
op.drop_table("stackschema")
op.drop_table("stackcomponentschema")
op.drop_table("pipelineschema")
op.drop_table("flavorschema")
op.drop_table("userschema")
op.drop_table("teamschema")
op.drop_table("roleschema")
op.drop_table("projectschema")
# ### end Alembic commands ###
upgrade()
Upgrade database schema and/or data, creating a new revision.
Source code in zenml/zen_stores/migrations/versions/alembic_start.py
def upgrade() -> None:
"""Upgrade database schema and/or data, creating a new revision."""
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"projectschema",
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"description", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"roleschema",
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"teamschema",
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"userschema",
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"full_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("email", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("active", sa.Boolean(), nullable=False),
sa.Column(
"password", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
sa.Column(
"activation_token",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.Column("email_opted_in", sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"flavorschema",
sa.Column("project_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("type", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("source", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"integration", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
sa.Column(
"config_schema", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["project_id"], ["projectschema.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(
["user_id"], ["userschema.id"], ondelete="SET NULL"
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"pipelineschema",
sa.Column("project_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"docstring",
sqlmodel.sql.sqltypes.AutoString(length=4096),
nullable=True,
),
sa.Column(
"spec",
sqlmodel.sql.sqltypes.AutoString(length=4096),
nullable=False,
),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["project_id"], ["projectschema.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(
["user_id"], ["userschema.id"], ondelete="SET NULL"
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"stackcomponentschema",
sa.Column("project_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("is_shared", sa.Boolean(), nullable=False),
sa.Column("type", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("flavor", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("configuration", sa.LargeBinary(), nullable=False),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["project_id"], ["projectschema.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(
["user_id"], ["userschema.id"], ondelete="SET NULL"
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"stackschema",
sa.Column("project_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("is_shared", sa.Boolean(), nullable=False),
sa.ForeignKeyConstraint(
["project_id"], ["projectschema.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(
["user_id"], ["userschema.id"], ondelete="SET NULL"
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"teamassignmentschema",
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("team_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.ForeignKeyConstraint(
["team_id"],
["teamschema.id"],
),
sa.ForeignKeyConstraint(
["user_id"],
["userschema.id"],
),
sa.PrimaryKeyConstraint("user_id", "team_id"),
)
op.create_table(
"teamroleassignmentschema",
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("role_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("team_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("project_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["project_id"],
["projectschema.id"],
),
sa.ForeignKeyConstraint(
["role_id"],
["roleschema.id"],
),
sa.ForeignKeyConstraint(
["team_id"],
["teamschema.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"userroleassignmentschema",
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("role_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("project_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["project_id"],
["projectschema.id"],
),
sa.ForeignKeyConstraint(
["role_id"],
["roleschema.id"],
),
sa.ForeignKeyConstraint(
["user_id"],
["userschema.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"pipelinerunschema",
sa.Column("project_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("user_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("stack_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("pipeline_id", sqlmodel.sql.sqltypes.GUID(), nullable=True),
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"pipeline_configuration",
sqlmodel.sql.sqltypes.AutoString(length=4096),
nullable=False,
),
sa.Column("num_steps", sa.Integer(), nullable=False),
sa.Column(
"zenml_version", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("git_sha", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.Column("mlmd_id", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["pipeline_id"], ["pipelineschema.id"], ondelete="SET NULL"
),
sa.ForeignKeyConstraint(
["project_id"], ["projectschema.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(
["stack_id"], ["stackschema.id"], ondelete="SET NULL"
),
sa.ForeignKeyConstraint(
["user_id"], ["userschema.id"], ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"stackcompositionschema",
sa.Column("stack_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("component_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.ForeignKeyConstraint(
["component_id"],
["stackcomponentschema.id"],
),
sa.ForeignKeyConstraint(
["stack_id"],
["stackschema.id"],
),
sa.PrimaryKeyConstraint("stack_id", "component_id"),
)
op.create_table(
"steprunschema",
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"pipeline_run_id", sqlmodel.sql.sqltypes.GUID(), nullable=False
),
sa.Column(
"entrypoint_name",
sqlmodel.sql.sqltypes.AutoString(),
nullable=False,
),
sa.Column(
"parameters",
sqlmodel.sql.sqltypes.AutoString(length=4096),
nullable=False,
),
sa.Column(
"step_configuration",
sqlmodel.sql.sqltypes.AutoString(length=4096),
nullable=False,
),
sa.Column(
"docstring",
sqlmodel.sql.sqltypes.AutoString(length=4096),
nullable=True,
),
sa.Column("mlmd_id", sa.Integer(), nullable=True),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["pipeline_run_id"],
["pipelinerunschema.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"artifactschema",
sa.Column("id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"parent_step_id", sqlmodel.sql.sqltypes.GUID(), nullable=False
),
sa.Column(
"producer_step_id", sqlmodel.sql.sqltypes.GUID(), nullable=False
),
sa.Column("type", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("uri", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"materializer", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column(
"data_type", sqlmodel.sql.sqltypes.AutoString(), nullable=False
),
sa.Column("is_cached", sa.Boolean(), nullable=False),
sa.Column("mlmd_id", sa.Integer(), nullable=True),
sa.Column("mlmd_parent_step_id", sa.Integer(), nullable=True),
sa.Column("mlmd_producer_step_id", sa.Integer(), nullable=True),
sa.Column("created", sa.DateTime(), nullable=False),
sa.Column("updated", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["parent_step_id"],
["steprunschema.id"],
),
sa.ForeignKeyConstraint(
["producer_step_id"],
["steprunschema.id"],
),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"steprunorderschema",
sa.Column("parent_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("child_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.ForeignKeyConstraint(
["child_id"],
["steprunschema.id"],
),
sa.ForeignKeyConstraint(
["parent_id"],
["steprunschema.id"],
),
sa.PrimaryKeyConstraint("parent_id", "child_id"),
)
op.create_table(
"stepinputartifactschema",
sa.Column("step_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("artifact_id", sqlmodel.sql.sqltypes.GUID(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.ForeignKeyConstraint(
["artifact_id"],
["artifactschema.id"],
),
sa.ForeignKeyConstraint(
["step_id"],
["steprunschema.id"],
),
sa.PrimaryKeyConstraint("step_id", "artifact_id"),
)
# ### end Alembic commands ###
c1b18cec3a48_increase_length_on_flavor_config_schema
Increase length on flavor config schema [c1b18cec3a48].
Revision ID: c1b18cec3a48 Revises: alembic_start Create Date: 2022-10-19 17:12:19.481776
downgrade()
Downgrade database schema and/or data back to the previous revision.
Source code in zenml/zen_stores/migrations/versions/c1b18cec3a48_increase_length_on_flavor_config_schema.py
def downgrade() -> None:
"""Downgrade database schema and/or data back to the previous revision."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("flavorschema", schema=None) as batch_op:
batch_op.alter_column(
"config_schema",
existing_type=sa.VARCHAR(),
type_=sqlmodel.sql.sqltypes.AutoString(),
nullable=False,
)
# ### end Alembic commands ###
upgrade()
Upgrade database schema and/or data, creating a new revision.
Source code in zenml/zen_stores/migrations/versions/c1b18cec3a48_increase_length_on_flavor_config_schema.py
def upgrade() -> None:
"""Upgrade database schema and/or data, creating a new revision."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("flavorschema", schema=None) as batch_op:
batch_op.alter_column(
"config_schema",
existing_type=sa.VARCHAR(),
type_=sqlmodel.sql.sqltypes.AutoString(4096),
nullable=True,
)
# ### end Alembic commands ###
ccd68b7825ae_add_status_to_pipeline_and_step_run
Add status to pipeline and step run [ccd68b7825ae].
Revision ID: ccd68b7825ae Revises: c1b18cec3a48 Create Date: 2022-10-24 16:49:37.007641
downgrade()
Downgrade database schema and/or data back to the previous revision.
Source code in zenml/zen_stores/migrations/versions/ccd68b7825ae_add_status_to_pipeline_and_step_run.py
def downgrade() -> None:
"""Downgrade database schema and/or data back to the previous revision."""
with op.batch_alter_table("steprunschema", schema=None) as batch_op:
batch_op.drop_column("status")
with op.batch_alter_table("pipelinerunschema", schema=None) as batch_op:
batch_op.drop_column("status")
upgrade()
Upgrade database schema and/or data, creating a new revision.
Source code in zenml/zen_stores/migrations/versions/ccd68b7825ae_add_status_to_pipeline_and_step_run.py
def upgrade() -> None:
"""Upgrade database schema and/or data, creating a new revision."""
with op.batch_alter_table("pipelinerunschema", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"status", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
)
op.execute("UPDATE pipelinerunschema SET status = 'running'")
with op.batch_alter_table("pipelinerunschema", schema=None) as batch_op:
batch_op.alter_column(
"status",
nullable=False,
existing_type=sqlmodel.sql.sqltypes.AutoString(),
)
with op.batch_alter_table("steprunschema", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"status", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
)
op.execute("UPDATE steprunschema SET status = 'running'")
with op.batch_alter_table("steprunschema", schema=None) as batch_op:
batch_op.alter_column(
"status",
nullable=False,
existing_type=sqlmodel.sql.sqltypes.AutoString(),
)
d02b3d3464cf_add_orchestrator_run_id_column
Add orchestrator_run_id column [d02b3d3464cf].
Revision ID: d02b3d3464cf Revises: ccd68b7825ae Create Date: 2022-10-26 16:50:44.965578
downgrade()
Downgrade database schema and/or data back to the previous revision.
Source code in zenml/zen_stores/migrations/versions/d02b3d3464cf_add_orchestrator_run_id_column.py
def downgrade() -> None:
"""Downgrade database schema and/or data back to the previous revision."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("pipelinerunschema", schema=None) as batch_op:
batch_op.drop_column("orchestrator_run_id")
# ### end Alembic commands ###
upgrade()
Upgrade database schema and/or data, creating a new revision.
Source code in zenml/zen_stores/migrations/versions/d02b3d3464cf_add_orchestrator_run_id_column.py
def upgrade() -> None:
"""Upgrade database schema and/or data, creating a new revision."""
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("pipelinerunschema", schema=None) as batch_op:
batch_op.add_column(
sa.Column(
"orchestrator_run_id",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
)
)
# ### end Alembic commands ###
rest_zen_store
REST Zen Store implementation.
RestZenStore (BaseZenStore)
pydantic-model
Store implementation for accessing data from a REST API.
Source code in zenml/zen_stores/rest_zen_store.py
class RestZenStore(BaseZenStore):
"""Store implementation for accessing data from a REST API."""
config: RestZenStoreConfiguration
TYPE: ClassVar[StoreType] = StoreType.REST
CONFIG_TYPE: ClassVar[Type[StoreConfiguration]] = RestZenStoreConfiguration
_api_token: Optional[str] = None
_session: Optional[requests.Session] = None
def _initialize_database(self) -> None:
"""Initialize the database."""
# don't do anything for a REST store
# ====================================
# ZenML Store interface implementation
# ====================================
# --------------------------------
# Initialization and configuration
# --------------------------------
def _initialize(self) -> None:
"""Initialize the REST store."""
client_version = zenml.__version__
server_version = self.get_store_info().version
if not DISABLE_CLIENT_SERVER_MISMATCH_WARNING and (
server_version != client_version
):
logger.warning(
"Your ZenML client version (%s) does not match the server "
"version (%s). This version mismatch might lead to errors or "
"unexpected behavior. \nTo disable this warning message, set "
"the environment variable `%s=True`",
client_version,
server_version,
ENV_ZENML_DISABLE_CLIENT_SERVER_MISMATCH_WARNING,
)
def get_store_info(self) -> ServerModel:
"""Get information about the server.
Returns:
Information about the server.
"""
body = self.get(INFO)
return ServerModel.parse_obj(body)
# ------------
# TFX Metadata
# ------------
def get_metadata_config(
self, expand_certs: bool = False
) -> Union["ConnectionConfig", "MetadataStoreClientConfig"]:
"""Get the TFX metadata config of this ZenStore.
Args:
expand_certs: Whether to expand the certificate paths in the
connection config to their value.
Raises:
ValueError: if the server response is invalid.
Returns:
The TFX metadata config of this ZenStore.
"""
from google.protobuf.json_format import Parse, ParseError
from ml_metadata.proto.metadata_store_pb2 import (
ConnectionConfig,
MetadataStoreClientConfig,
)
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
body = self.get(f"{METADATA_CONFIG}")
if not isinstance(body, str):
raise ValueError(
f"Invalid response from server: {body}. Expected string."
)
# First try to parse the response as a ConnectionConfig, then as a
# MetadataStoreClientConfig.
try:
metadata_config_pb = Parse(body, ConnectionConfig())
except ParseError:
return Parse(body, MetadataStoreClientConfig())
# if the server returns a SQLite connection config, but the file is not
# available locally, we need to replace the path with the local path of
# the default local SQLite database
if metadata_config_pb.HasField("sqlite") and not os.path.isfile(
metadata_config_pb.sqlite.filename_uri
):
message = (
f"The ZenML server is using a SQLite database at "
f"{metadata_config_pb.sqlite.filename_uri} that is not "
f"available locally. Using the default local SQLite "
f"database instead."
)
if not self.is_local_store():
logger.warning(message)
else:
logger.debug(message)
default_store_cfg = GlobalConfiguration().get_default_store()
assert isinstance(default_store_cfg, SqlZenStoreConfiguration)
return default_store_cfg.get_metadata_config()
if metadata_config_pb.HasField("mysql"):
# If the server returns a MySQL connection config with a hostname
# that is a Docker or K3D internal hostname that cannot be resolved
# locally, we need to replace it with localhost. We're assuming
# that we're running on the host machine and the MySQL server can
# be accessed via localhost.
metadata_config_pb.mysql.host = (
replace_internal_hostname_with_localhost(
metadata_config_pb.mysql.host
)
)
if not expand_certs and metadata_config_pb.mysql.HasField(
"ssl_options"
):
# Save the certificates in a secure location on disk
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
for key in ["ssl_key", "ssl_ca", "ssl_cert"]:
if not metadata_config_pb.mysql.ssl_options.HasField(
key.lstrip("ssl_")
):
continue
content = getattr(
metadata_config_pb.mysql.ssl_options,
key.lstrip("ssl_"),
)
if content and not os.path.isfile(content):
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, f"{key}.pem")
with open(file_path, "w") as f:
f.write(content)
file_path.chmod(0o600)
setattr(
metadata_config_pb.mysql.ssl_options,
key.lstrip("ssl_"),
str(file_path),
)
return metadata_config_pb
# ------
# Stacks
# ------
@track(AnalyticsEvent.REGISTERED_STACK)
def create_stack(
self,
stack: StackModel,
) -> StackModel:
"""Register a new stack.
Args:
stack: The stack to register.
Returns:
The registered stack.
"""
return self._create_project_scoped_resource(
resource=stack,
route=STACKS,
request_model=CreateStackRequest,
)
def get_stack(self, stack_id: UUID) -> StackModel:
"""Get a stack by its unique ID.
Args:
stack_id: The ID of the stack to get.
Returns:
The stack with the given ID.
"""
return self._get_resource(
resource_id=stack_id,
route=STACKS,
resource_model=StackModel,
)
def list_stacks(
self,
project_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
component_id: Optional[UUID] = None,
name: Optional[str] = None,
is_shared: Optional[bool] = None,
hydrated: bool = False,
) -> Union[List[StackModel], List[HydratedStackModel]]:
"""List all stacks matching the given filter criteria.
Args:
project_name_or_id: Id or name of the Project containing the stack
user_name_or_id: Optionally filter stacks by their owner
component_id: Optionally filter for stacks that contain the
component
name: Optionally filter stacks by their name
is_shared: Optionally filter out stacks by whether they are shared
or not
hydrated: Flag to decide whether to return hydrated models.
Returns:
A list of all stacks matching the filter criteria.
"""
filters = locals()
filters.pop("self")
if hydrated:
return self._list_resources(
route=STACKS,
resource_model=HydratedStackModel,
**filters,
)
else:
return self._list_resources(
route=STACKS,
resource_model=StackModel,
**filters,
)
@track(AnalyticsEvent.UPDATED_STACK)
def update_stack(
self,
stack: StackModel,
) -> StackModel:
"""Update a stack.
Args:
stack: The stack to use for the update.
Returns:
The updated stack.
"""
return self._update_resource(
resource=stack,
route=STACKS,
request_model=UpdateStackRequest,
)
@track(AnalyticsEvent.DELETED_STACK)
def delete_stack(self, stack_id: UUID) -> None:
"""Delete a stack.
Args:
stack_id: The ID of the stack to delete.
"""
self._delete_resource(
resource_id=stack_id,
route=STACKS,
)
# ----------------
# Stack components
# ----------------
@track(AnalyticsEvent.REGISTERED_STACK_COMPONENT)
def create_stack_component(
self,
component: ComponentModel,
) -> ComponentModel:
"""Create a stack component.
Args:
component: The stack component to create.
Returns:
The created stack component.
"""
return self._create_project_scoped_resource(
resource=component,
route=STACK_COMPONENTS,
# TODO[Stefan]: for when the request model is ready
# request_model=CreateStackComponentRequest,
)
def get_stack_component(self, component_id: UUID) -> ComponentModel:
"""Get a stack component by ID.
Args:
component_id: The ID of the stack component to get.
Returns:
The stack component.
"""
return self._get_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
resource_model=ComponentModel,
)
def list_stack_components(
self,
project_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
type: Optional[str] = None,
flavor_name: Optional[str] = None,
name: Optional[str] = None,
is_shared: Optional[bool] = None,
) -> List[ComponentModel]:
"""List all stack components matching the given filter criteria.
Args:
project_name_or_id: The ID or name of the Project to which the stack
components belong
type: Optionally filter by type of stack component
flavor_name: Optionally filter by flavor
user_name_or_id: Optionally filter stack components by the owner
name: Optionally filter stack component by name
is_shared: Optionally filter out stack component by whether they are
shared or not
Returns:
A list of all stack components matching the filter criteria.
"""
filters = locals()
filters.pop("self")
return self._list_resources(
route=STACK_COMPONENTS,
resource_model=ComponentModel,
**filters,
)
@track(AnalyticsEvent.UPDATED_STACK_COMPONENT)
def update_stack_component(
self,
component: ComponentModel,
) -> ComponentModel:
"""Update an existing stack component.
Args:
component: The stack component to use for the update.
Returns:
The updated stack component.
"""
return self._update_resource(
resource=component,
route=STACK_COMPONENTS,
# TODO[Stefan]: for when the request model is ready
# request_model=UpdateComponentRequest,
)
@track(AnalyticsEvent.DELETED_STACK_COMPONENT)
def delete_stack_component(self, component_id: UUID) -> None:
"""Delete a stack component.
Args:
component_id: The ID of the stack component to delete.
"""
self._delete_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
)
def get_stack_component_side_effects(
self,
component_id: UUID,
run_id: UUID,
pipeline_id: UUID,
stack_id: UUID,
) -> Dict[Any, Any]:
"""Get the side effects of a stack component.
Args:
component_id: The ID of the stack component to get side effects for.
run_id: The ID of the run to get side effects for.
pipeline_id: The ID of the pipeline to get side effects for.
stack_id: The ID of the stack to get side effects for.
"""
# -----------------------
# Stack component flavors
# -----------------------
@track(AnalyticsEvent.CREATED_FLAVOR)
def create_flavor(
self,
flavor: FlavorModel,
) -> FlavorModel:
"""Creates a new stack component flavor.
Args:
flavor: The stack component flavor to create.
Returns:
The newly created flavor.
"""
return self._create_project_scoped_resource(
resource=flavor,
route=FLAVORS,
# TODO[Stefan]: for when the request model is ready
# request_model=CreateFlavorRequest,
)
def get_flavor(self, flavor_id: UUID) -> FlavorModel:
"""Get a stack component flavor by ID.
Args:
flavor_id: The ID of the stack component flavor to get.
Returns:
The stack component flavor.
"""
return self._get_resource(
resource_id=flavor_id,
route=FLAVORS,
resource_model=FlavorModel,
)
def list_flavors(
self,
project_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
component_type: Optional[StackComponentType] = None,
name: Optional[str] = None,
is_shared: Optional[bool] = None,
) -> List[FlavorModel]:
"""List all stack component flavors matching the given filter criteria.
Args:
project_name_or_id: Optionally filter by the Project to which the
component flavors belong
user_name_or_id: Optionally filter by the owner
component_type: Optionally filter by type of stack component
name: Optionally filter flavors by name
is_shared: Optionally filter out flavors by whether they are
shared or not
Returns:
List of all the stack component flavors matching the given criteria.
"""
filters = locals()
filters.pop("self")
return self._list_resources(
route=FLAVORS,
resource_model=FlavorModel,
**filters,
)
@track(AnalyticsEvent.UPDATED_FLAVOR)
def update_flavor(self, flavor: FlavorModel) -> FlavorModel:
"""Update an existing stack component flavor.
Args:
flavor: The stack component flavor to use for the update.
Returns:
The updated stack component flavor.
"""
return self._update_resource(
resource=flavor,
route=FLAVORS,
# TODO[Stefan]: for when the request model is ready
# request_model=UpdateFlavorRequest,
)
@track(AnalyticsEvent.DELETED_FLAVOR)
def delete_flavor(self, flavor_id: UUID) -> None:
"""Delete a stack component flavor.
Args:
flavor_id: The ID of the stack component flavor to delete.
"""
self._delete_resource(
resource_id=flavor_id,
route=FLAVORS,
)
# -----
# Users
# -----
@property
def active_user_name(self) -> str:
"""Gets the active username.
Returns:
The active username.
"""
return self.config.username
@track(AnalyticsEvent.CREATED_USER)
def create_user(self, user: UserModel) -> UserModel:
"""Creates a new user.
Args:
user: User to be created.
Returns:
The newly created user.
"""
return self._create_resource(
resource=user,
route=USERS,
request_model=CreateUserRequest,
response_model=CreateUserResponse,
)
def get_user(self, user_name_or_id: Union[str, UUID]) -> UserModel:
"""Gets a specific user.
Args:
user_name_or_id: The name or ID of the user to get.
Returns:
The requested user, if it was found.
"""
return self._get_resource(
resource_id=user_name_or_id,
route=USERS,
resource_model=UserModel,
)
# TODO: [ALEX] add filtering param(s)
def list_users(self) -> List[UserModel]:
"""List all users.
Returns:
A list of all users.
"""
filters = locals()
filters.pop("self")
return self._list_resources(
route=USERS,
resource_model=UserModel,
**filters,
)
@track(AnalyticsEvent.UPDATED_USER)
def update_user(self, user: UserModel) -> UserModel:
"""Updates an existing user.
Args:
user: The user model to use for the update.
Returns:
The updated user.
"""
return self._update_resource(
resource=user,
route=USERS,
request_model=UpdateUserRequest,
)
@track(AnalyticsEvent.DELETED_USER)
def delete_user(self, user_name_or_id: Union[str, UUID]) -> None:
"""Deletes a user.
Args:
user_name_or_id: The name or ID of the user to delete.
"""
self._delete_resource(
resource_id=user_name_or_id,
route=USERS,
)
def user_email_opt_in(
self,
user_name_or_id: Union[str, UUID],
user_opt_in_response: bool,
email: Optional[str] = None,
) -> UserModel:
"""Persist user response to the email prompt.
Args:
user_name_or_id: The name or the ID of the user.
user_opt_in_response: Whether this email should be associated
with the user id in the telemetry
email: The users email
Returns:
The updated user.
"""
request = EmailOptInModel(
email=email, email_opted_in=user_opt_in_response
)
route = f"{USERS}/{str(user_name_or_id)}{EMAIL_ANALYTICS}"
response_body = self.put(route, body=request)
user = UserModel.parse_obj(response_body)
return user
# -----
# Teams
# -----
@track(AnalyticsEvent.CREATED_TEAM)
def create_team(self, team: TeamModel) -> TeamModel:
"""Creates a new team.
Args:
team: The team model to create.
Returns:
The newly created team.
"""
return self._create_resource(
resource=team,
route=TEAMS,
request_model=CreateTeamRequest,
)
def get_team(self, team_name_or_id: Union[str, UUID]) -> TeamModel:
"""Gets a specific team.
Args:
team_name_or_id: Name or ID of the team to get.
Returns:
The requested team.
"""
return self._get_resource(
resource_id=team_name_or_id,
route=TEAMS,
resource_model=TeamModel,
)
def list_teams(self) -> List[TeamModel]:
"""List all teams.
Returns:
A list of all teams.
"""
filters = locals()
filters.pop("self")
return self._list_resources(
route=TEAMS,
resource_model=TeamModel,
**filters,
)
@track(AnalyticsEvent.UPDATED_TEAM)
def update_team(self, team: TeamModel) -> TeamModel:
"""Update an existing team.
Args:
team: The team to use for the update.
Returns:
The updated team.
"""
return self._update_resource(
resource=team,
route=TEAMS,
request_model=UpdateTeamRequest,
)
@track(AnalyticsEvent.DELETED_TEAM)
def delete_team(self, team_name_or_id: Union[str, UUID]) -> None:
"""Deletes a team.
Args:
team_name_or_id: Name or ID of the team to delete.
"""
self._delete_resource(
resource_id=team_name_or_id,
route=TEAMS,
)
# ---------------
# Team membership
# ---------------
def get_users_for_team(
self, team_name_or_id: Union[str, UUID]
) -> List[UserModel]:
"""Fetches all users of a team.
Args:
team_name_or_id: The name or ID of the team for which to get users.
Raises:
NotImplementedError: This method is not implemented
"""
raise NotImplementedError("Not Implemented")
def get_teams_for_user(
self, user_name_or_id: Union[str, UUID]
) -> List[TeamModel]:
"""Fetches all teams for a user.
Args:
user_name_or_id: The name or ID of the user for which to get all
teams.
Raises:
NotImplementedError: This method is not implemented
"""
raise NotImplementedError("Not Implemented")
def add_user_to_team(
self,
user_name_or_id: Union[str, UUID],
team_name_or_id: Union[str, UUID],
) -> None:
"""Adds a user to a team.
Args:
user_name_or_id: Name or ID of the user to add to the team.
team_name_or_id: Name or ID of the team to which to add the user to.
Raises:
NotImplementedError: This method is not implemented
"""
raise NotImplementedError("Not Implemented")
def remove_user_from_team(
self,
user_name_or_id: Union[str, UUID],
team_name_or_id: Union[str, UUID],
) -> None:
"""Removes a user from a team.
Args:
user_name_or_id: Name or ID of the user to remove from the team.
team_name_or_id: Name or ID of the team from which to remove the
user.
Raises:
NotImplementedError: This method is not implemented
"""
raise NotImplementedError("Not Implemented")
# -----
# Roles
# -----
@track(AnalyticsEvent.CREATED_ROLE)
def create_role(self, role: RoleModel) -> RoleModel:
"""Creates a new role.
Args:
role: The role model to create.
Returns:
The newly created role.
"""
return self._create_resource(
resource=role,
route=ROLES,
request_model=CreateRoleRequest,
)
# TODO: consider using team_id instead
def get_role(self, role_name_or_id: Union[str, UUID]) -> RoleModel:
"""Gets a specific role.
Args:
role_name_or_id: Name or ID of the role to get.
Returns:
The requested role.
"""
return self._get_resource(
resource_id=role_name_or_id,
route=ROLES,
resource_model=RoleModel,
)
# TODO: [ALEX] add filtering param(s)
def list_roles(self) -> List[RoleModel]:
"""List all roles.
Returns:
A list of all roles.
"""
filters = locals()
filters.pop("self")
return self._list_resources(
route=ROLES,
resource_model=RoleModel,
**filters,
)
@track(AnalyticsEvent.UPDATED_ROLE)
def update_role(self, role: RoleModel) -> RoleModel:
"""Update an existing role.
Args:
role: The role to use for the update.
Returns:
The updated role.
"""
return self._update_resource(
resource=role,
route=ROLES,
request_model=UpdateRoleRequest,
)
@track(AnalyticsEvent.DELETED_ROLE)
def delete_role(self, role_name_or_id: Union[str, UUID]) -> None:
"""Deletes a role.
Args:
role_name_or_id: Name or ID of the role to delete.
"""
self._delete_resource(
resource_id=role_name_or_id,
route=ROLES,
)
# ----------------
# Role assignments
# ----------------
def list_role_assignments(
self,
project_name_or_id: Optional[Union[str, UUID]] = None,
role_name_or_id: Optional[Union[str, UUID]] = None,
team_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
) -> List[RoleAssignmentModel]:
"""List all role assignments.
Args:
project_name_or_id: If provided, only list assignments for the given
project
role_name_or_id: If provided, only list assignments of the given
role
team_name_or_id: If provided, only list assignments for the given
team
user_name_or_id: If provided, only list assignments for the given
user
Returns:
A list of all role assignments.
"""
roles: List[RoleAssignmentModel] = []
if user_name_or_id:
roles.extend(
self._list_resources(
route=f"{USERS}/{user_name_or_id}{ROLES}",
resource_model=RoleAssignmentModel,
project_name_or_id=project_name_or_id,
)
)
if team_name_or_id:
roles.extend(
self._list_resources(
route=f"{TEAMS}/{team_name_or_id}{ROLES}",
resource_model=RoleAssignmentModel,
project_name_or_id=project_name_or_id,
)
)
return roles
def assign_role(
self,
role_name_or_id: Union[str, UUID],
user_or_team_name_or_id: Union[str, UUID],
project_name_or_id: Optional[Union[str, UUID]] = None,
is_user: bool = True,
) -> None:
"""Assigns a role to a user or team, scoped to a specific project.
Args:
role_name_or_id: Name or ID of the role to assign.
user_or_team_name_or_id: Name or ID of the user or team to which to
assign the role.
is_user: Whether `user_or_team_id` refers to a user or a team.
project_name_or_id: Optional Name or ID of a project in which to
assign the role. If this is not provided, the role will be
assigned globally.
"""
path = (
f"{USERS}/{str(user_or_team_name_or_id)}{ROLES}"
f"?role_name_or_id={role_name_or_id}"
)
logger.debug(f"Sending POST request to {path}...")
self._request(
"POST",
self.url + API + VERSION_1 + path,
data=json.dumps({}),
)
def revoke_role(
self,
role_name_or_id: Union[str, UUID],
user_or_team_name_or_id: Union[str, UUID],
is_user: bool = True,
project_name_or_id: Optional[Union[str, UUID]] = None,
) -> None:
"""Revokes a role from a user or team for a given project.
Args:
role_name_or_id: ID of the role to revoke.
user_or_team_name_or_id: Name or ID of the user or team from which
to revoke the role.
is_user: Whether `user_or_team_id` refers to a user or a team.
project_name_or_id: Optional ID of a project in which to revoke
the role. If this is not provided, the role will be revoked
globally.
"""
path = (
f"{USERS}/{str(user_or_team_name_or_id)}{ROLES}"
f"/{str(role_name_or_id)}"
)
logger.debug(f"Sending POST request to {path}...")
self._request(
"DELETE",
self.url + API + VERSION_1 + path,
data=json.dumps({}),
)
# --------
# Projects
# --------
@track(AnalyticsEvent.CREATED_PROJECT)
def create_project(self, project: ProjectModel) -> ProjectModel:
"""Creates a new project.
Args:
project: The project to create.
Returns:
The newly created project.
"""
return self._create_resource(
resource=project,
route=PROJECTS,
request_model=CreateProjectRequest,
)
def get_project(self, project_name_or_id: Union[UUID, str]) -> ProjectModel:
"""Get an existing project by name or ID.
Args:
project_name_or_id: Name or ID of the project to get.
Returns:
The requested project.
"""
return self._get_resource(
resource_id=project_name_or_id,
route=PROJECTS,
resource_model=ProjectModel,
)
# TODO: [ALEX] add filtering param(s)
def list_projects(self) -> List[ProjectModel]:
"""List all projects.
Returns:
A list of all projects.
"""
filters = locals()
filters.pop("self")
return self._list_resources(
route=PROJECTS,
resource_model=ProjectModel,
**filters,
)
@track(AnalyticsEvent.UPDATED_PROJECT)
def update_project(self, project: ProjectModel) -> ProjectModel:
"""Update an existing project.
Args:
project: The project to use for the update.
Returns:
The updated project.
"""
return self._update_resource(
resource=project,
route=PROJECTS,
request_model=UpdateProjectRequest,
)
@track(AnalyticsEvent.DELETED_PROJECT)
def delete_project(self, project_name_or_id: Union[str, UUID]) -> None:
"""Deletes a project.
Args:
project_name_or_id: Name or ID of the project to delete.
"""
self._delete_resource(
resource_id=project_name_or_id,
route=PROJECTS,
)
# ---------
# Pipelines
# ---------
@track(AnalyticsEvent.CREATE_PIPELINE)
def create_pipeline(self, pipeline: PipelineModel) -> PipelineModel:
"""Creates a new pipeline in a project.
Args:
pipeline: The pipeline to create.
Returns:
The newly created pipeline.
"""
return self._create_project_scoped_resource(
resource=pipeline,
route=PIPELINES,
request_model=CreatePipelineRequest,
)
def get_pipeline(self, pipeline_id: UUID) -> PipelineModel:
"""Get a pipeline with a given ID.
Args:
pipeline_id: ID of the pipeline.
Returns:
The pipeline.
"""
return self._get_resource(
resource_id=pipeline_id,
route=PIPELINES,
resource_model=PipelineModel,
)
def list_pipelines(
self,
project_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
name: Optional[str] = None,
) -> List[PipelineModel]:
"""List all pipelines in the project.
Args:
project_name_or_id: If provided, only list pipelines in this project.
user_name_or_id: If provided, only list pipelines from this user.
name: If provided, only list pipelines with this name.
Returns:
A list of pipelines.
"""
filters = locals()
filters.pop("self")
return self._list_resources(
route=PIPELINES,
resource_model=PipelineModel,
**filters,
)
@track(AnalyticsEvent.UPDATE_PIPELINE)
def update_pipeline(self, pipeline: PipelineModel) -> PipelineModel:
"""Updates a pipeline.
Args:
pipeline: The pipeline to use for the update.
Returns:
The updated pipeline.
"""
return self._update_resource(
resource=pipeline,
route=PIPELINES,
request_model=UpdatePipelineRequest,
)
@track(AnalyticsEvent.DELETE_PIPELINE)
def delete_pipeline(self, pipeline_id: UUID) -> None:
"""Deletes a pipeline.
Args:
pipeline_id: The ID of the pipeline to delete.
"""
self._delete_resource(
resource_id=pipeline_id,
route=PIPELINES,
)
# --------------
# Pipeline runs
# --------------
def create_run(self, pipeline_run: PipelineRunModel) -> PipelineRunModel:
"""Creates a pipeline run.
Args:
pipeline_run: The pipeline run to create.
Returns:
The created pipeline run.
"""
return self._create_project_scoped_resource(
resource=pipeline_run,
route=RUNS,
)
def get_run(self, run_name_or_id: Union[str, UUID]) -> PipelineRunModel:
"""Gets a pipeline run.
Args:
run_name_or_id: The name or ID of the pipeline run to get.
Returns:
The pipeline run.
"""
self._sync_runs()
return self._get_resource(
resource_id=run_name_or_id,
route=RUNS,
resource_model=PipelineRunModel,
)
def get_or_create_run(
self, pipeline_run: PipelineRunModel
) -> PipelineRunModel:
"""Gets or creates a pipeline run.
If a run with the same ID or name already exists, it is returned.
Otherwise, a new run is created.
Args:
pipeline_run: The pipeline run to get or create.
Returns:
The pipeline run.
"""
return self._create_project_scoped_resource(
resource=pipeline_run, route=RUNS, params={"get_if_exists": True}
)
def list_runs(
self,
project_name_or_id: Optional[Union[str, UUID]] = None,
stack_id: Optional[UUID] = None,
component_id: Optional[UUID] = None,
run_name: Optional[str] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
pipeline_id: Optional[UUID] = None,
unlisted: bool = False,
) -> List[PipelineRunModel]:
"""Gets all pipeline runs.
Args:
project_name_or_id: If provided, only return runs for this project.
stack_id: If provided, only return runs for this stack.
component_id: Optionally filter for runs that used the
component
run_name: Run name if provided
user_name_or_id: If provided, only return runs for this user.
pipeline_id: If provided, only return runs for this pipeline.
unlisted: If True, only return unlisted runs that are not
associated with any pipeline (filter by `pipeline_id==None`).
Returns:
A list of all pipeline runs.
"""
self._sync_runs()
filters = locals()
filters.pop("self")
return self._list_resources(
route=RUNS,
resource_model=PipelineRunModel,
**filters,
)
def update_run(self, run: PipelineRunModel) -> PipelineRunModel:
"""Updates a pipeline run.
Args:
run: The pipeline run to use for the update.
Returns:
The updated pipeline run.
"""
return self._update_resource(
resource=run,
route=RUNS,
)
# TODO: Figure out what exactly gets returned from this
def get_run_component_side_effects(
self,
run_id: UUID,
component_id: Optional[UUID] = None,
) -> Dict[str, Any]:
"""Gets the side effects for a component in a pipeline run.
Args:
run_id: The ID of the pipeline run to get.
component_id: The ID of the component to get.
"""
# ------------------
# Pipeline run steps
# ------------------
def create_run_step(self, step: StepRunModel) -> StepRunModel:
"""Creates a step.
Args:
step: The step to create.
Returns:
The created step.
"""
return self._create_resource(
resource=step,
route=STEPS,
)
def get_run_step(self, step_id: UUID) -> StepRunModel:
"""Get a step by ID.
Args:
step_id: The ID of the step to get.
Returns:
The step.
"""
self._sync_runs()
return self._get_resource(
resource_id=step_id,
route=STEPS,
resource_model=StepRunModel,
)
def list_run_steps(
self, run_id: Optional[UUID] = None
) -> List[StepRunModel]:
"""Get all run steps.
Args:
run_id: If provided, only return steps for this pipeline run.
Returns:
A list of all run steps.
"""
self._sync_runs()
filters = locals()
filters.pop("self")
return self._list_resources(
route=STEPS,
resource_model=StepRunModel,
**filters,
)
def update_run_step(self, step: StepRunModel) -> StepRunModel:
"""Updates a step.
Args:
step: The step to update.
Returns:
The updated step.
"""
return self._update_resource(
resource=step,
route=STEPS,
)
def get_run_step_inputs(self, step_id: UUID) -> Dict[str, ArtifactModel]:
"""Get a list of inputs for a specific step.
Args:
step_id: The id of the step to get inputs for.
Returns:
A dict mapping artifact names to the input artifacts for the step.
Raises:
ValueError: if the response from the API is not a dict.
"""
body = self.get(f"{STEPS}/{str(step_id)}{INPUTS}")
if not isinstance(body, dict):
raise ValueError(
f"Bad API Response. Expected dict, got {type(body)}"
)
return {
name: ArtifactModel.parse_obj(entry) for name, entry in body.items()
}
# ---------
# Artifacts
# ---------
def create_artifact(self, artifact: ArtifactModel) -> ArtifactModel:
"""Creates an artifact.
Args:
artifact: The artifact to create.
Returns:
The created artifact.
"""
return self._create_resource(
resource=artifact,
route=ARTIFACTS,
)
def list_artifacts(
self,
artifact_uri: Optional[str] = None,
parent_step_id: Optional[UUID] = None,
) -> List[ArtifactModel]:
"""Lists all artifacts.
Args:
artifact_uri: If specified, only artifacts with the given URI will
be returned.
parent_step_id: If specified, only artifacts for the given step run
will be returned.
Returns:
A list of all artifacts.
"""
self._sync_runs()
filters = locals()
filters.pop("self")
return self._list_resources(
route=ARTIFACTS,
resource_model=ArtifactModel,
**filters,
)
# =======================
# Internal helper methods
# =======================
def _get_auth_token(self) -> str:
"""Get the authentication token for the REST store.
Returns:
The authentication token.
Raises:
ValueError: if the response from the server isn't in the right format.
"""
if self._api_token is None:
response = self._handle_response(
requests.post(
self.url + API + VERSION_1 + LOGIN,
data={
"username": self.config.username,
"password": self.config.password,
},
verify=self.config.verify_ssl,
timeout=self.config.http_timeout,
)
)
if not isinstance(response, dict) or "access_token" not in response:
raise ValueError(
f"Bad API Response. Expected access token dict, got "
f"{type(response)}"
)
self._api_token = response["access_token"]
return self._api_token
@property
def session(self) -> requests.Session:
"""Authenticate to the ZenML server.
Returns:
A requests session with the authentication token.
"""
if self._session is None:
if self.config.verify_ssl is False:
urllib3.disable_warnings(
urllib3.exceptions.InsecureRequestWarning
)
self._session = requests.Session()
self._session.verify = self.config.verify_ssl
token = self._get_auth_token()
self._session.headers.update({"Authorization": "Bearer " + token})
logger.debug("Authenticated to ZenML server.")
return self._session
def _handle_response(self, response: requests.Response) -> Json:
"""Handle API response, translating http status codes to Exception.
Args:
response: The response to handle.
Returns:
The parsed response.
Raises:
DoesNotExistException: If the response indicates that the
requested entity does not exist.
EntityExistsError: If the response indicates that the requested
entity already exists.
AuthorizationException: If the response indicates that the request
is not authorized.
IllegalOperationError: If the response indicates that the requested
operation is forbidden.
KeyError: If the response indicates that the requested entity
does not exist.
RuntimeError: If the response indicates that the requested entity
does not exist.
StackComponentExistsError: If the response indicates that the
requested entity already exists.
StackExistsError: If the response indicates that the requested
entity already exists.
ValueError: If the response indicates that the requested entity
does not exist.
"""
if response.status_code >= 200 and response.status_code < 300:
try:
payload: Json = response.json()
return payload
except requests.exceptions.JSONDecodeError:
raise ValueError(
"Bad response from API. Expected json, got\n"
f"{response.text}"
)
elif response.status_code == 401:
raise AuthorizationException(
f"{response.status_code} Client Error: Unauthorized request to "
f"URL {response.url}: {response.json().get('detail')}"
)
elif response.status_code == 403:
msg = response.json().get("detail", response.text)
if isinstance(msg, list):
msg = msg[-1]
raise IllegalOperationError(msg)
elif response.status_code == 404:
if "KeyError" in response.text:
raise KeyError(
response.json().get("detail", (response.text,))[1]
)
elif "DoesNotExistException" in response.text:
message = ": ".join(
response.json().get("detail", (response.text,))
)
raise DoesNotExistException(message)
raise DoesNotExistException("Endpoint does not exist.")
elif response.status_code == 409:
if "StackComponentExistsError" in response.text:
raise StackComponentExistsError(
message=": ".join(
response.json().get("detail", (response.text,))
)
)
elif "StackExistsError" in response.text:
raise StackExistsError(
message=": ".join(
response.json().get("detail", (response.text,))
)
)
elif "EntityExistsError" in response.text:
raise EntityExistsError(
message=": ".join(
response.json().get("detail", (response.text,))
)
)
else:
raise ValueError(
": ".join(response.json().get("detail", (response.text,)))
)
elif response.status_code == 422:
raise RuntimeError(
": ".join(response.json().get("detail", (response.text,)))
)
elif response.status_code == 500:
raise RuntimeError(response.text)
else:
raise RuntimeError(
"Error retrieving from API. Got response "
f"{response.status_code} with body:\n{response.text}"
)
def _request(
self,
method: str,
url: str,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a request to the REST API.
Args:
method: The HTTP method to use.
url: The URL to request.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The parsed response.
"""
params = {k: str(v) for k, v in params.items()} if params else {}
try:
return self._handle_response(
self.session.request(
method,
url,
params=params,
verify=self.config.verify_ssl,
timeout=self.config.http_timeout,
**kwargs,
)
)
except AuthorizationException:
# The authentication token could have expired; refresh it and try
# again
self._session = None
return self._handle_response(
self.session.request(
method,
url,
params=params,
verify=self.config.verify_ssl,
timeout=self.config.http_timeout,
**kwargs,
)
)
def get(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a GET request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending GET request to {path}...")
return self._request(
"GET", self.url + API + VERSION_1 + path, params=params, **kwargs
)
def delete(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a DELETE request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending DELETE request to {path}...")
return self._request(
"DELETE", self.url + API + VERSION_1 + path, params=params, **kwargs
)
def post(
self,
path: str,
body: BaseModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a POST request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending POST request to {path}...")
return self._request(
"POST",
self.url + API + VERSION_1 + path,
data=body.json(),
params=params,
**kwargs,
)
def put(
self,
path: str,
body: BaseModel,
params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> Json:
"""Make a PUT request to the given endpoint path.
Args:
path: The path to the endpoint.
body: The body to send.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending PUT request to {path}...")
return self._request(
"PUT",
self.url + API + VERSION_1 + path,
data=body.json(),
params=params,
**kwargs,
)
def _create_resource(
self,
resource: AnyModel,
route: str,
request_model: Optional[Type[CreateRequest[AnyModel]]] = None,
response_model: Optional[Type[CreateResponse[AnyModel]]] = None,
params: Optional[Dict[str, Any]] = None,
) -> AnyModel:
"""Create a new resource.
Args:
resource: The resource to create.
route: The resource REST API route to use.
request_model: Optional model to use to serialize the request body.
If not provided, the resource object itself will be used.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
params: Optional query parameters to pass to the endpoint.
Returns:
The created resource.
"""
request: BaseModel = resource
if request_model is not None:
request = request_model.from_model(resource)
response_body = self.post(f"{route}", body=request, params=params)
if response_model is not None:
response = response_model.parse_obj(response_body)
created_resource = response.to_model()
else:
created_resource = resource.parse_obj(response_body)
return created_resource
def _create_project_scoped_resource(
self,
resource: AnyProjectScopedModel,
route: str,
request_model: Optional[
Type[CreateRequest[AnyProjectScopedModel]]
] = None,
response_model: Optional[
Type[CreateResponse[AnyProjectScopedModel]]
] = None,
params: Optional[Dict[str, Any]] = None,
) -> AnyProjectScopedModel:
"""Create a new project scoped resource.
Args:
resource: The resource to create.
route: The resource REST API route to use.
request_model: Optional model to use to serialize the request body.
If not provided, the resource object itself will be used.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
params: Optional query parameters to pass to the endpoint.
Returns:
The created resource.
"""
return self._create_resource(
resource=resource,
route=f"{PROJECTS}/{str(resource.project)}{route}",
request_model=request_model,
response_model=response_model,
params=params,
)
def _get_resource(
self,
resource_id: Union[str, UUID],
route: str,
resource_model: Type[AnyModel],
) -> AnyModel:
"""Retrieve a single resource.
Args:
resource_id: The ID of the resource to retrieve.
route: The resource REST API route to use.
resource_model: Model to use to serialize the response body.
Returns:
The retrieved resource.
"""
body = self.get(f"{route}/{str(resource_id)}")
return resource_model.parse_obj(body)
def _list_resources(
self,
route: str,
resource_model: Type[AnyModel],
**filters: Any,
) -> List[AnyModel]:
"""Retrieve a list of resources filtered by some criteria.
Args:
route: The resource REST API route to use.
resource_model: Model to use to serialize the response body.
filters: Filter parameters to use in the query.
Returns:
List of retrieved resources matching the filter criteria.
Raises:
ValueError: If the value returned by the server is not a list.
"""
# leave out filter params that are not supplied
params = dict(filter(lambda x: x[1] is not None, filters.items()))
body = self.get(f"{route}", params=params)
if not isinstance(body, list):
raise ValueError(
f"Bad API Response. Expected list, got {type(body)}"
)
return [resource_model.parse_obj(entry) for entry in body]
def _update_resource(
self,
resource: AnyModel,
route: str,
request_model: Optional[Type[UpdateRequest[AnyModel]]] = None,
response_model: Optional[Type[UpdateResponse[AnyModel]]] = None,
) -> AnyModel:
"""Update an existing resource.
Args:
resource: The resource to update.
route: The resource REST API route to use.
request_model: Optional model to use to serialize the request body.
If not provided, the resource object itself will be used.
response_model: Optional model to use to deserialize the response
body. If not provided, the resource class itself will be used.
Returns:
The updated resource.
"""
request: BaseModel = resource
if request_model is not None:
request = request_model.from_model(resource)
response_body = self.put(f"{route}/{str(resource.id)}", body=request)
if response_model is not None:
response = response_model.parse_obj(response_body)
updated_resource = response.to_model()
else:
updated_resource = resource.parse_obj(response_body)
return updated_resource
def _delete_resource(
self, resource_id: Union[str, UUID], route: str
) -> None:
"""Delete a resource.
Args:
resource_id: The ID of the resource to delete.
route: The resource REST API route to use.
"""
self.delete(f"{route}/{str(resource_id)}")
def _sync_runs(self) -> None:
"""Syncs runs from MLMD."""
self.get(METADATA_SYNC)
active_user_name: str
property
readonly
Gets the active username.
Returns:
Type | Description |
---|---|
str |
The active username. |
session: Session
property
readonly
Authenticate to the ZenML server.
Returns:
Type | Description |
---|---|
Session |
A requests session with the authentication token. |
CONFIG_TYPE (StoreConfiguration)
pydantic-model
REST ZenML store configuration.
Attributes:
Name | Type | Description |
---|---|---|
username |
str |
The username to use to connect to the Zen server. |
password |
str |
The password to use to connect to the Zen server. |
verify_ssl |
Union[bool, str] |
Either a boolean, in which case it controls whether we verify the server's TLS certificate, or a string, in which case it must be a path to a CA bundle to use or the CA bundle value itself. |
http_timeout |
int |
The timeout to use for all requests. |
Source code in zenml/zen_stores/rest_zen_store.py
class RestZenStoreConfiguration(StoreConfiguration):
"""REST ZenML store configuration.
Attributes:
username: The username to use to connect to the Zen server.
password: The password to use to connect to the Zen server.
verify_ssl: Either a boolean, in which case it controls whether we
verify the server's TLS certificate, or a string, in which case it
must be a path to a CA bundle to use or the CA bundle value itself.
http_timeout: The timeout to use for all requests.
"""
type: StoreType = StoreType.REST
username: str
password: str = ""
verify_ssl: Union[bool, str] = True
http_timeout: int = DEFAULT_HTTP_TIMEOUT
@validator("url")
def validate_url(cls, url: str) -> str:
"""Validates that the URL is a well formed REST store URL.
Args:
url: The URL to be validated.
Returns:
The validated URL without trailing slashes.
Raises:
ValueError: If the URL is not a well formed REST store URL.
"""
url = url.rstrip("/")
scheme = re.search("^([a-z0-9]+://)", url)
if scheme is None or scheme.group() not in ("https://", "http://"):
raise ValueError(
"Invalid URL for REST store: {url}. Should be in the form "
"https://hostname[:port] or http://hostname[:port]."
)
# When running inside a container, if the URL uses localhost, the
# target service will not be available. We try to replace localhost
# with one of the special Docker or K3D internal hostnames.
url = replace_localhost_with_internal_hostname(url)
return url
@validator("verify_ssl")
def validate_verify_ssl(
cls, verify_ssl: Union[bool, str]
) -> Union[bool, str]:
"""Validates that the verify_ssl field either points to a file or is a bool.
Args:
verify_ssl: The verify_ssl value to be validated.
Returns:
The validated verify_ssl value.
"""
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
if isinstance(verify_ssl, bool) or verify_ssl.startswith(
str(secret_folder)
):
return verify_ssl
if os.path.isfile(verify_ssl):
with open(verify_ssl, "r") as f:
verify_ssl = f.read()
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, "ca_bundle.pem")
with open(file_path, "w") as f:
f.write(verify_ssl)
file_path.chmod(0o600)
verify_ssl = str(file_path)
return verify_ssl
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
"""Check if a URL scheme is supported by this store.
Args:
url: The URL to check.
Returns:
True if the URL scheme is supported, False otherwise.
"""
return urlparse(url).scheme in ("http", "https")
def expand_certificates(self) -> None:
"""Expands the certificates in the verify_ssl field."""
# Load the certificate values back into the configuration
if isinstance(self.verify_ssl, str) and os.path.isfile(self.verify_ssl):
with open(self.verify_ssl, "r") as f:
self.verify_ssl = f.read()
@classmethod
def copy_configuration(
cls,
config: "StoreConfiguration",
config_path: str,
load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
"""Create a copy of the store config using a different configuration path.
This method is used to create a copy of the store configuration that can
be loaded using a different configuration path or in the context of a
new environment, such as a container image.
The configuration files accompanying the store configuration are also
copied to the new configuration path (e.g. certificates etc.).
Args:
config: The store configuration to copy.
config_path: new path where the configuration copy will be loaded
from.
load_config_path: absolute path that will be used to load the copied
configuration. This can be set to a value different from
`config_path` if the configuration copy will be loaded from
a different environment, e.g. when the configuration is copied
to a container image and loaded using a different absolute path.
This will be reflected in the paths and URLs encoded in the
copied configuration.
Returns:
A new store configuration object that reflects the new configuration
path.
"""
assert isinstance(config, RestZenStoreConfiguration)
config = config.copy(deep=True)
# Load the certificate values back into the configuration
config.expand_certificates()
return config
class Config:
"""Pydantic configuration class."""
# Don't validate attributes when assigning them. This is necessary
# because the `verify_ssl` attribute can be expanded to the contents
# of the certificate file.
validate_assignment = False
# Forbid extra attributes set in the class.
extra = "forbid"
Config
Pydantic configuration class.
Source code in zenml/zen_stores/rest_zen_store.py
class Config:
"""Pydantic configuration class."""
# Don't validate attributes when assigning them. This is necessary
# because the `verify_ssl` attribute can be expanded to the contents
# of the certificate file.
validate_assignment = False
# Forbid extra attributes set in the class.
extra = "forbid"
copy_configuration(config, config_path, load_config_path=None)
classmethod
Create a copy of the store config using a different configuration path.
This method is used to create a copy of the store configuration that can be loaded using a different configuration path or in the context of a new environment, such as a container image.
The configuration files accompanying the store configuration are also copied to the new configuration path (e.g. certificates etc.).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
StoreConfiguration |
The store configuration to copy. |
required |
config_path |
str |
new path where the configuration copy will be loaded from. |
required |
load_config_path |
Optional[pathlib.PurePath] |
absolute path that will be used to load the copied
configuration. This can be set to a value different from
|
None |
Returns:
Type | Description |
---|---|
StoreConfiguration |
A new store configuration object that reflects the new configuration path. |
Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def copy_configuration(
cls,
config: "StoreConfiguration",
config_path: str,
load_config_path: Optional[PurePath] = None,
) -> "StoreConfiguration":
"""Create a copy of the store config using a different configuration path.
This method is used to create a copy of the store configuration that can
be loaded using a different configuration path or in the context of a
new environment, such as a container image.
The configuration files accompanying the store configuration are also
copied to the new configuration path (e.g. certificates etc.).
Args:
config: The store configuration to copy.
config_path: new path where the configuration copy will be loaded
from.
load_config_path: absolute path that will be used to load the copied
configuration. This can be set to a value different from
`config_path` if the configuration copy will be loaded from
a different environment, e.g. when the configuration is copied
to a container image and loaded using a different absolute path.
This will be reflected in the paths and URLs encoded in the
copied configuration.
Returns:
A new store configuration object that reflects the new configuration
path.
"""
assert isinstance(config, RestZenStoreConfiguration)
config = config.copy(deep=True)
# Load the certificate values back into the configuration
config.expand_certificates()
return config
expand_certificates(self)
Expands the certificates in the verify_ssl field.
Source code in zenml/zen_stores/rest_zen_store.py
def expand_certificates(self) -> None:
"""Expands the certificates in the verify_ssl field."""
# Load the certificate values back into the configuration
if isinstance(self.verify_ssl, str) and os.path.isfile(self.verify_ssl):
with open(self.verify_ssl, "r") as f:
self.verify_ssl = f.read()
supports_url_scheme(url)
classmethod
Check if a URL scheme is supported by this store.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the URL scheme is supported, False otherwise. |
Source code in zenml/zen_stores/rest_zen_store.py
@classmethod
def supports_url_scheme(cls, url: str) -> bool:
"""Check if a URL scheme is supported by this store.
Args:
url: The URL to check.
Returns:
True if the URL scheme is supported, False otherwise.
"""
return urlparse(url).scheme in ("http", "https")
validate_url(url)
classmethod
Validates that the URL is a well formed REST store URL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str |
The URL to be validated. |
required |
Returns:
Type | Description |
---|---|
str |
The validated URL without trailing slashes. |
Exceptions:
Type | Description |
---|---|
ValueError |
If the URL is not a well formed REST store URL. |
Source code in zenml/zen_stores/rest_zen_store.py
@validator("url")
def validate_url(cls, url: str) -> str:
"""Validates that the URL is a well formed REST store URL.
Args:
url: The URL to be validated.
Returns:
The validated URL without trailing slashes.
Raises:
ValueError: If the URL is not a well formed REST store URL.
"""
url = url.rstrip("/")
scheme = re.search("^([a-z0-9]+://)", url)
if scheme is None or scheme.group() not in ("https://", "http://"):
raise ValueError(
"Invalid URL for REST store: {url}. Should be in the form "
"https://hostname[:port] or http://hostname[:port]."
)
# When running inside a container, if the URL uses localhost, the
# target service will not be available. We try to replace localhost
# with one of the special Docker or K3D internal hostnames.
url = replace_localhost_with_internal_hostname(url)
return url
validate_verify_ssl(verify_ssl)
classmethod
Validates that the verify_ssl field either points to a file or is a bool.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
verify_ssl |
Union[bool, str] |
The verify_ssl value to be validated. |
required |
Returns:
Type | Description |
---|---|
Union[bool, str] |
The validated verify_ssl value. |
Source code in zenml/zen_stores/rest_zen_store.py
@validator("verify_ssl")
def validate_verify_ssl(
cls, verify_ssl: Union[bool, str]
) -> Union[bool, str]:
"""Validates that the verify_ssl field either points to a file or is a bool.
Args:
verify_ssl: The verify_ssl value to be validated.
Returns:
The validated verify_ssl value.
"""
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
if isinstance(verify_ssl, bool) or verify_ssl.startswith(
str(secret_folder)
):
return verify_ssl
if os.path.isfile(verify_ssl):
with open(verify_ssl, "r") as f:
verify_ssl = f.read()
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, "ca_bundle.pem")
with open(file_path, "w") as f:
f.write(verify_ssl)
file_path.chmod(0o600)
verify_ssl = str(file_path)
return verify_ssl
add_user_to_team(self, user_name_or_id, team_name_or_id)
Adds a user to a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user to add to the team. |
required |
team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the team to which to add the user to. |
required |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
This method is not implemented |
Source code in zenml/zen_stores/rest_zen_store.py
def add_user_to_team(
self,
user_name_or_id: Union[str, UUID],
team_name_or_id: Union[str, UUID],
) -> None:
"""Adds a user to a team.
Args:
user_name_or_id: Name or ID of the user to add to the team.
team_name_or_id: Name or ID of the team to which to add the user to.
Raises:
NotImplementedError: This method is not implemented
"""
raise NotImplementedError("Not Implemented")
assign_role(self, role_name_or_id, user_or_team_name_or_id, project_name_or_id=None, is_user=True)
Assigns a role to a user or team, scoped to a specific project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the role to assign. |
required |
user_or_team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the user or team to which to assign the role. |
required |
is_user |
bool |
Whether |
True |
project_name_or_id |
Union[str, uuid.UUID] |
Optional Name or ID of a project in which to assign the role. If this is not provided, the role will be assigned globally. |
None |
Source code in zenml/zen_stores/rest_zen_store.py
def assign_role(
self,
role_name_or_id: Union[str, UUID],
user_or_team_name_or_id: Union[str, UUID],
project_name_or_id: Optional[Union[str, UUID]] = None,
is_user: bool = True,
) -> None:
"""Assigns a role to a user or team, scoped to a specific project.
Args:
role_name_or_id: Name or ID of the role to assign.
user_or_team_name_or_id: Name or ID of the user or team to which to
assign the role.
is_user: Whether `user_or_team_id` refers to a user or a team.
project_name_or_id: Optional Name or ID of a project in which to
assign the role. If this is not provided, the role will be
assigned globally.
"""
path = (
f"{USERS}/{str(user_or_team_name_or_id)}{ROLES}"
f"?role_name_or_id={role_name_or_id}"
)
logger.debug(f"Sending POST request to {path}...")
self._request(
"POST",
self.url + API + VERSION_1 + path,
data=json.dumps({}),
)
create_artifact(self, artifact)
Creates an artifact.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact |
ArtifactModel |
The artifact to create. |
required |
Returns:
Type | Description |
---|---|
ArtifactModel |
The created artifact. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_artifact(self, artifact: ArtifactModel) -> ArtifactModel:
"""Creates an artifact.
Args:
artifact: The artifact to create.
Returns:
The created artifact.
"""
return self._create_resource(
resource=artifact,
route=ARTIFACTS,
)
create_flavor(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_pipeline(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_project(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_role(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_run(self, pipeline_run)
Creates a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRunModel |
The pipeline run to create. |
required |
Returns:
Type | Description |
---|---|
PipelineRunModel |
The created pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_run(self, pipeline_run: PipelineRunModel) -> PipelineRunModel:
"""Creates a pipeline run.
Args:
pipeline_run: The pipeline run to create.
Returns:
The created pipeline run.
"""
return self._create_project_scoped_resource(
resource=pipeline_run,
route=RUNS,
)
create_run_step(self, step)
Creates a step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step |
StepRunModel |
The step to create. |
required |
Returns:
Type | Description |
---|---|
StepRunModel |
The created step. |
Source code in zenml/zen_stores/rest_zen_store.py
def create_run_step(self, step: StepRunModel) -> StepRunModel:
"""Creates a step.
Args:
step: The step to create.
Returns:
The created step.
"""
return self._create_resource(
resource=step,
route=STEPS,
)
create_stack(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_stack_component(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_team(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
create_user(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete(self, path, params=None, **kwargs)
Make a DELETE request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def delete(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a DELETE request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending DELETE request to {path}...")
return self._request(
"DELETE", self.url + API + VERSION_1 + path, params=params, **kwargs
)
delete_flavor(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_pipeline(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_project(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_role(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_stack(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_stack_component(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_team(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
delete_user(*args, **kwargs)
Inner decorator function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
*args |
Any |
Arguments to be passed to the function. |
() |
**kwargs |
Any |
Keyword arguments to be passed to the function. |
{} |
Returns:
Type | Description |
---|---|
Any |
Result of the function. |
Source code in zenml/zen_stores/rest_zen_store.py
def inner_func(*args: Any, **kwargs: Any) -> Any:
"""Inner decorator function.
Args:
*args: Arguments to be passed to the function.
**kwargs: Keyword arguments to be passed to the function.
Returns:
Result of the function.
"""
result = func(*args, **kwargs)
try:
tracker: Optional[AnalyticsTrackerMixin] = None
if len(args) and isinstance(args[0], AnalyticsTrackerMixin):
tracker = args[0]
for obj in [result] + list(args) + list(kwargs.values()):
if isinstance(obj, AnalyticsTrackedModelMixin):
obj.track_event(event_name, tracker=tracker)
break
else:
if tracker:
tracker.track_event(event_name, metadata)
else:
track_event(event_name, metadata)
except Exception as e:
logger.debug(f"Analytics tracking failure for {func}: {e}")
return result
get(self, path, params=None, **kwargs)
Make a GET request to the given endpoint path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
str |
The path to the endpoint. |
required |
params |
Optional[Dict[str, Any]] |
The query parameters to pass to the endpoint. |
None |
kwargs |
Any |
Additional keyword arguments to pass to the request. |
{} |
Returns:
Type | Description |
---|---|
Union[Dict[str, Any], List[Any], str, int, float, bool] |
The response body. |
Source code in zenml/zen_stores/rest_zen_store.py
def get(
self, path: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any
) -> Json:
"""Make a GET request to the given endpoint path.
Args:
path: The path to the endpoint.
params: The query parameters to pass to the endpoint.
kwargs: Additional keyword arguments to pass to the request.
Returns:
The response body.
"""
logger.debug(f"Sending GET request to {path}...")
return self._request(
"GET", self.url + API + VERSION_1 + path, params=params, **kwargs
)
get_flavor(self, flavor_id)
Get a stack component flavor by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flavor_id |
UUID |
The ID of the stack component flavor to get. |
required |
Returns:
Type | Description |
---|---|
FlavorModel |
The stack component flavor. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_flavor(self, flavor_id: UUID) -> FlavorModel:
"""Get a stack component flavor by ID.
Args:
flavor_id: The ID of the stack component flavor to get.
Returns:
The stack component flavor.
"""
return self._get_resource(
resource_id=flavor_id,
route=FLAVORS,
resource_model=FlavorModel,
)
get_metadata_config(self, expand_certs=False)
Get the TFX metadata config of this ZenStore.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
expand_certs |
bool |
Whether to expand the certificate paths in the connection config to their value. |
False |
Exceptions:
Type | Description |
---|---|
ValueError |
if the server response is invalid. |
Returns:
Type | Description |
---|---|
Union[ConnectionConfig, MetadataStoreClientConfig] |
The TFX metadata config of this ZenStore. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_metadata_config(
self, expand_certs: bool = False
) -> Union["ConnectionConfig", "MetadataStoreClientConfig"]:
"""Get the TFX metadata config of this ZenStore.
Args:
expand_certs: Whether to expand the certificate paths in the
connection config to their value.
Raises:
ValueError: if the server response is invalid.
Returns:
The TFX metadata config of this ZenStore.
"""
from google.protobuf.json_format import Parse, ParseError
from ml_metadata.proto.metadata_store_pb2 import (
ConnectionConfig,
MetadataStoreClientConfig,
)
from zenml.zen_stores.sql_zen_store import SqlZenStoreConfiguration
body = self.get(f"{METADATA_CONFIG}")
if not isinstance(body, str):
raise ValueError(
f"Invalid response from server: {body}. Expected string."
)
# First try to parse the response as a ConnectionConfig, then as a
# MetadataStoreClientConfig.
try:
metadata_config_pb = Parse(body, ConnectionConfig())
except ParseError:
return Parse(body, MetadataStoreClientConfig())
# if the server returns a SQLite connection config, but the file is not
# available locally, we need to replace the path with the local path of
# the default local SQLite database
if metadata_config_pb.HasField("sqlite") and not os.path.isfile(
metadata_config_pb.sqlite.filename_uri
):
message = (
f"The ZenML server is using a SQLite database at "
f"{metadata_config_pb.sqlite.filename_uri} that is not "
f"available locally. Using the default local SQLite "
f"database instead."
)
if not self.is_local_store():
logger.warning(message)
else:
logger.debug(message)
default_store_cfg = GlobalConfiguration().get_default_store()
assert isinstance(default_store_cfg, SqlZenStoreConfiguration)
return default_store_cfg.get_metadata_config()
if metadata_config_pb.HasField("mysql"):
# If the server returns a MySQL connection config with a hostname
# that is a Docker or K3D internal hostname that cannot be resolved
# locally, we need to replace it with localhost. We're assuming
# that we're running on the host machine and the MySQL server can
# be accessed via localhost.
metadata_config_pb.mysql.host = (
replace_internal_hostname_with_localhost(
metadata_config_pb.mysql.host
)
)
if not expand_certs and metadata_config_pb.mysql.HasField(
"ssl_options"
):
# Save the certificates in a secure location on disk
secret_folder = Path(
GlobalConfiguration().local_stores_path,
"certificates",
)
for key in ["ssl_key", "ssl_ca", "ssl_cert"]:
if not metadata_config_pb.mysql.ssl_options.HasField(
key.lstrip("ssl_")
):
continue
content = getattr(
metadata_config_pb.mysql.ssl_options,
key.lstrip("ssl_"),
)
if content and not os.path.isfile(content):
fileio.makedirs(str(secret_folder))
file_path = Path(secret_folder, f"{key}.pem")
with open(file_path, "w") as f:
f.write(content)
file_path.chmod(0o600)
setattr(
metadata_config_pb.mysql.ssl_options,
key.lstrip("ssl_"),
str(file_path),
)
return metadata_config_pb
get_or_create_run(self, pipeline_run)
Gets or creates a pipeline run.
If a run with the same ID or name already exists, it is returned. Otherwise, a new run is created.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_run |
PipelineRunModel |
The pipeline run to get or create. |
required |
Returns:
Type | Description |
---|---|
PipelineRunModel |
The pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_or_create_run(
self, pipeline_run: PipelineRunModel
) -> PipelineRunModel:
"""Gets or creates a pipeline run.
If a run with the same ID or name already exists, it is returned.
Otherwise, a new run is created.
Args:
pipeline_run: The pipeline run to get or create.
Returns:
The pipeline run.
"""
return self._create_project_scoped_resource(
resource=pipeline_run, route=RUNS, params={"get_if_exists": True}
)
get_pipeline(self, pipeline_id)
Get a pipeline with a given ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pipeline_id |
UUID |
ID of the pipeline. |
required |
Returns:
Type | Description |
---|---|
PipelineModel |
The pipeline. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_pipeline(self, pipeline_id: UUID) -> PipelineModel:
"""Get a pipeline with a given ID.
Args:
pipeline_id: ID of the pipeline.
Returns:
The pipeline.
"""
return self._get_resource(
resource_id=pipeline_id,
route=PIPELINES,
resource_model=PipelineModel,
)
get_project(self, project_name_or_id)
Get an existing project by name or ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[uuid.UUID, str] |
Name or ID of the project to get. |
required |
Returns:
Type | Description |
---|---|
ProjectModel |
The requested project. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_project(self, project_name_or_id: Union[UUID, str]) -> ProjectModel:
"""Get an existing project by name or ID.
Args:
project_name_or_id: Name or ID of the project to get.
Returns:
The requested project.
"""
return self._get_resource(
resource_id=project_name_or_id,
route=PROJECTS,
resource_model=ProjectModel,
)
get_role(self, role_name_or_id)
Gets a specific role.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
role_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the role to get. |
required |
Returns:
Type | Description |
---|---|
RoleModel |
The requested role. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_role(self, role_name_or_id: Union[str, UUID]) -> RoleModel:
"""Gets a specific role.
Args:
role_name_or_id: Name or ID of the role to get.
Returns:
The requested role.
"""
return self._get_resource(
resource_id=role_name_or_id,
route=ROLES,
resource_model=RoleModel,
)
get_run(self, run_name_or_id)
Gets a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the pipeline run to get. |
required |
Returns:
Type | Description |
---|---|
PipelineRunModel |
The pipeline run. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_run(self, run_name_or_id: Union[str, UUID]) -> PipelineRunModel:
"""Gets a pipeline run.
Args:
run_name_or_id: The name or ID of the pipeline run to get.
Returns:
The pipeline run.
"""
self._sync_runs()
return self._get_resource(
resource_id=run_name_or_id,
route=RUNS,
resource_model=PipelineRunModel,
)
get_run_component_side_effects(self, run_id, component_id=None)
Gets the side effects for a component in a pipeline run.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
run_id |
UUID |
The ID of the pipeline run to get. |
required |
component_id |
Optional[uuid.UUID] |
The ID of the component to get. |
None |
Source code in zenml/zen_stores/rest_zen_store.py
def get_run_component_side_effects(
self,
run_id: UUID,
component_id: Optional[UUID] = None,
) -> Dict[str, Any]:
"""Gets the side effects for a component in a pipeline run.
Args:
run_id: The ID of the pipeline run to get.
component_id: The ID of the component to get.
"""
get_run_step(self, step_id)
Get a step by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
UUID |
The ID of the step to get. |
required |
Returns:
Type | Description |
---|---|
StepRunModel |
The step. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_run_step(self, step_id: UUID) -> StepRunModel:
"""Get a step by ID.
Args:
step_id: The ID of the step to get.
Returns:
The step.
"""
self._sync_runs()
return self._get_resource(
resource_id=step_id,
route=STEPS,
resource_model=StepRunModel,
)
get_run_step_inputs(self, step_id)
Get a list of inputs for a specific step.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
step_id |
UUID |
The id of the step to get inputs for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, zenml.models.pipeline_models.ArtifactModel] |
A dict mapping artifact names to the input artifacts for the step. |
Exceptions:
Type | Description |
---|---|
ValueError |
if the response from the API is not a dict. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_run_step_inputs(self, step_id: UUID) -> Dict[str, ArtifactModel]:
"""Get a list of inputs for a specific step.
Args:
step_id: The id of the step to get inputs for.
Returns:
A dict mapping artifact names to the input artifacts for the step.
Raises:
ValueError: if the response from the API is not a dict.
"""
body = self.get(f"{STEPS}/{str(step_id)}{INPUTS}")
if not isinstance(body, dict):
raise ValueError(
f"Bad API Response. Expected dict, got {type(body)}"
)
return {
name: ArtifactModel.parse_obj(entry) for name, entry in body.items()
}
get_stack(self, stack_id)
Get a stack by its unique ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
stack_id |
UUID |
The ID of the stack to get. |
required |
Returns:
Type | Description |
---|---|
StackModel |
The stack with the given ID. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_stack(self, stack_id: UUID) -> StackModel:
"""Get a stack by its unique ID.
Args:
stack_id: The ID of the stack to get.
Returns:
The stack with the given ID.
"""
return self._get_resource(
resource_id=stack_id,
route=STACKS,
resource_model=StackModel,
)
get_stack_component(self, component_id)
Get a stack component by ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
UUID |
The ID of the stack component to get. |
required |
Returns:
Type | Description |
---|---|
ComponentModel |
The stack component. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_stack_component(self, component_id: UUID) -> ComponentModel:
"""Get a stack component by ID.
Args:
component_id: The ID of the stack component to get.
Returns:
The stack component.
"""
return self._get_resource(
resource_id=component_id,
route=STACK_COMPONENTS,
resource_model=ComponentModel,
)
get_stack_component_side_effects(self, component_id, run_id, pipeline_id, stack_id)
Get the side effects of a stack component.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
component_id |
UUID |
The ID of the stack component to get side effects for. |
required |
run_id |
UUID |
The ID of the run to get side effects for. |
required |
pipeline_id |
UUID |
The ID of the pipeline to get side effects for. |
required |
stack_id |
UUID |
The ID of the stack to get side effects for. |
required |
Source code in zenml/zen_stores/rest_zen_store.py
def get_stack_component_side_effects(
self,
component_id: UUID,
run_id: UUID,
pipeline_id: UUID,
stack_id: UUID,
) -> Dict[Any, Any]:
"""Get the side effects of a stack component.
Args:
component_id: The ID of the stack component to get side effects for.
run_id: The ID of the run to get side effects for.
pipeline_id: The ID of the pipeline to get side effects for.
stack_id: The ID of the stack to get side effects for.
"""
get_store_info(self)
Get information about the server.
Returns:
Type | Description |
---|---|
ServerModel |
Information about the server. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_store_info(self) -> ServerModel:
"""Get information about the server.
Returns:
Information about the server.
"""
body = self.get(INFO)
return ServerModel.parse_obj(body)
get_team(self, team_name_or_id)
Gets a specific team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Union[str, uuid.UUID] |
Name or ID of the team to get. |
required |
Returns:
Type | Description |
---|---|
TeamModel |
The requested team. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_team(self, team_name_or_id: Union[str, UUID]) -> TeamModel:
"""Gets a specific team.
Args:
team_name_or_id: Name or ID of the team to get.
Returns:
The requested team.
"""
return self._get_resource(
resource_id=team_name_or_id,
route=TEAMS,
resource_model=TeamModel,
)
get_teams_for_user(self, user_name_or_id)
Fetches all teams for a user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the user for which to get all teams. |
required |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
This method is not implemented |
Source code in zenml/zen_stores/rest_zen_store.py
def get_teams_for_user(
self, user_name_or_id: Union[str, UUID]
) -> List[TeamModel]:
"""Fetches all teams for a user.
Args:
user_name_or_id: The name or ID of the user for which to get all
teams.
Raises:
NotImplementedError: This method is not implemented
"""
raise NotImplementedError("Not Implemented")
get_user(self, user_name_or_id)
Gets a specific user.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
user_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the user to get. |
required |
Returns:
Type | Description |
---|---|
UserModel |
The requested user, if it was found. |
Source code in zenml/zen_stores/rest_zen_store.py
def get_user(self, user_name_or_id: Union[str, UUID]) -> UserModel:
"""Gets a specific user.
Args:
user_name_or_id: The name or ID of the user to get.
Returns:
The requested user, if it was found.
"""
return self._get_resource(
resource_id=user_name_or_id,
route=USERS,
resource_model=UserModel,
)
get_users_for_team(self, team_name_or_id)
Fetches all users of a team.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
team_name_or_id |
Union[str, uuid.UUID] |
The name or ID of the team for which to get users. |
required |
Exceptions:
Type | Description |
---|---|
NotImplementedError |
This method is not implemented |
Source code in zenml/zen_stores/rest_zen_store.py
def get_users_for_team(
self, team_name_or_id: Union[str, UUID]
) -> List[UserModel]:
"""Fetches all users of a team.
Args:
team_name_or_id: The name or ID of the team for which to get users.
Raises:
NotImplementedError: This method is not implemented
"""
raise NotImplementedError("Not Implemented")
list_artifacts(self, artifact_uri=None, parent_step_id=None)
Lists all artifacts.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
artifact_uri |
Optional[str] |
If specified, only artifacts with the given URI will be returned. |
None |
parent_step_id |
Optional[uuid.UUID] |
If specified, only artifacts for the given step run will be returned. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.pipeline_models.ArtifactModel] |
A list of all artifacts. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_artifacts(
self,
artifact_uri: Optional[str] = None,
parent_step_id: Optional[UUID] = None,
) -> List[ArtifactModel]:
"""Lists all artifacts.
Args:
artifact_uri: If specified, only artifacts with the given URI will
be returned.
parent_step_id: If specified, only artifacts for the given step run
will be returned.
Returns:
A list of all artifacts.
"""
self._sync_runs()
filters = locals()
filters.pop("self")
return self._list_resources(
route=ARTIFACTS,
resource_model=ArtifactModel,
**filters,
)
list_flavors(self, project_name_or_id=None, user_name_or_id=None, component_type=None, name=None, is_shared=None)
List all stack component flavors matching the given filter criteria.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by the Project to which the component flavors belong |
None |
user_name_or_id |
Union[str, uuid.UUID] |
Optionally filter by the owner |
None |
component_type |
Optional[zenml.enums.StackComponentType] |
Optionally filter by type of stack component |
None |
name |
Optional[str] |
Optionally filter flavors by name |
None |
is_shared |
Optional[bool] |
Optionally filter out flavors by whether they are shared or not |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.flavor_models.FlavorModel] |
List of all the stack component flavors matching the given criteria. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_flavors(
self,
project_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
component_type: Optional[StackComponentType] = None,
name: Optional[str] = None,
is_shared: Optional[bool] = None,
) -> List[FlavorModel]:
"""List all stack component flavors matching the given filter criteria.
Args:
project_name_or_id: Optionally filter by the Project to which the
component flavors belong
user_name_or_id: Optionally filter by the owner
component_type: Optionally filter by type of stack component
name: Optionally filter flavors by name
is_shared: Optionally filter out flavors by whether they are
shared or not
Returns:
List of all the stack component flavors matching the given criteria.
"""
filters = locals()
filters.pop("self")
return self._list_resources(
route=FLAVORS,
resource_model=FlavorModel,
**filters,
)
list_pipelines(self, project_name_or_id=None, user_name_or_id=None, name=None)
List all pipelines in the project.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
project_name_or_id |
Union[str, uuid.UUID] |
If provided, only list pipelines in this project. |
None |
user_name_or_id |
Union[str, uuid.UUID] |
If provided, only list pipelines from this user. |
None |
name |
Optional[str] |
If provided, only list pipelines with this name. |
None |
Returns:
Type | Description |
---|---|
List[zenml.models.pipeline_models.PipelineModel] |
A list of pipelines. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_pipelines(
self,
project_name_or_id: Optional[Union[str, UUID]] = None,
user_name_or_id: Optional[Union[str, UUID]] = None,
name: Optional[str] = None,
) -> List[PipelineModel]:
"""List all pipelines in the project.
Args:
project_name_or_id: If provided, only list pipelines in this project.
user_name_or_id: If provided, only list pipelines from this user.
name: If provided, only list pipelines with this name.
Returns:
A list of pipelines.
"""
filters = locals()
filters.pop("self")
return self._list_resources(
route=PIPELINES,
resource_model=PipelineModel,
**filters,
)
list_projects(self)
List all projects.
Returns:
Type | Description |
---|---|
List[zenml.models.project_models.ProjectModel] |
A list of all projects. |
Source code in zenml/zen_stores/rest_zen_store.py
def list_projects(self) -> List