Skip to content

Service Connectors

zenml.service_connectors special

ZenML Service Connectors.

docker_service_connector

Docker Service Connector.

The Docker Service Connector is responsible for authenticating with a Docker (or compatible) registry.

DockerAuthenticationMethods (StrEnum)

Docker Authentication methods.

Source code in zenml/service_connectors/docker_service_connector.py
class DockerAuthenticationMethods(StrEnum):
    """Docker Authentication methods."""

    PASSWORD = "password"

DockerConfiguration (DockerCredentials) pydantic-model

Docker client configuration.

Source code in zenml/service_connectors/docker_service_connector.py
class DockerConfiguration(DockerCredentials):
    """Docker client configuration."""

    registry: Optional[str] = Field(
        default=None,
        title="Registry server URL. Omit to use DockerHub.",
    )

DockerCredentials (AuthenticationConfig) pydantic-model

Docker client authentication credentials.

Source code in zenml/service_connectors/docker_service_connector.py
class DockerCredentials(AuthenticationConfig):
    """Docker client authentication credentials."""

    username: SecretStr = Field(
        title="Username",
    )
    password: SecretStr = Field(
        title="Password",
    )

DockerServiceConnector (ServiceConnector) pydantic-model

Docker service connector.

Source code in zenml/service_connectors/docker_service_connector.py
class DockerServiceConnector(ServiceConnector):
    """Docker service connector."""

    config: DockerConfiguration

    @classmethod
    def _get_connector_type(cls) -> ServiceConnectorTypeModel:
        """Get the service connector specification.

        Returns:
            The service connector specification.
        """
        return DOCKER_SERVICE_CONNECTOR_TYPE_SPEC

    @classmethod
    def _parse_resource_id(
        cls,
        resource_id: str,
    ) -> str:
        """Validate and convert a Docker resource ID into a Docker registry name.

        Args:
            resource_id: The resource ID to convert.

        Returns:
            The Docker registry name.

        Raises:
            ValueError: If the provided resource ID is not a valid Docker
                registry.
        """
        registry: Optional[str] = None
        if re.match(
            r"^(https?://)?[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)*(:[0-9]+)?(/.+)*$",
            resource_id,
        ):
            # The resource ID is a repository URL
            if resource_id.startswith("https://") or resource_id.startswith(
                "http://"
            ):
                registry = resource_id.split("/")[2]
            else:
                registry = resource_id.split("/")[0]
        else:
            raise ValueError(
                f"Invalid resource ID for a Docker registry: {resource_id}. "
                f"Please provide a valid repository name or URL in the "
                f"following format:\n"
                "DockerHub: docker.io or [https://]index.docker.io/v1/[/<repository-name>]"
                "generic OCI registry URI: http[s]://host[:port][/<repository-name>]"
            )

        if registry == f"index.{DOCKER_REGISTRY_NAME}":
            registry = DOCKER_REGISTRY_NAME
        return registry

    def _canonical_resource_id(
        self, resource_type: str, resource_id: str
    ) -> str:
        """Convert a resource ID to its canonical form.

        Args:
            resource_type: The resource type to canonicalize.
            resource_id: The resource ID to canonicalize.

        Returns:
            The canonical resource ID.
        """
        return self._parse_resource_id(resource_id)

    def _get_default_resource_id(self, resource_type: str) -> str:
        """Get the default resource ID for a resource type.

        Args:
            resource_type: The type of the resource to get a default resource ID
                for. Only called with resource types that do not support
                multiple instances.

        Returns:
            The default resource ID for the resource type.
        """
        return self._canonical_resource_id(
            resource_type, self.config.registry or DOCKER_REGISTRY_NAME
        )

    def _authorize_client(
        self,
        docker_client: DockerClient,
        resource_id: str,
    ) -> None:
        """Authorize a Docker client to have access to the configured Docker registry.

        Args:
            docker_client: The Docker client to authenticate.
            resource_id: The resource ID to authorize the client for.

        Raises:
            AuthorizationException: If the client could not be authenticated.
        """
        cfg = self.config
        registry = self._parse_resource_id(resource_id)

        try:
            docker_client.login(
                username=cfg.username.get_secret_value(),
                password=cfg.password.get_secret_value(),
                registry=registry
                if registry != DOCKER_REGISTRY_NAME
                else None,
                reauth=True,
            )
        except DockerException as e:
            raise AuthorizationException(
                f"failed to authenticate with Docker registry {registry}: {e}"
            )

    def _connect_to_resource(
        self,
        **kwargs: Any,
    ) -> Any:
        """Authenticate and connect to a Docker/OCI registry.

        Initialize, authenticate and return a python-docker client.

        Args:
            kwargs: Additional implementation specific keyword arguments to pass
                to the session or client constructor.

        Returns:
            An authenticated python-docker client object.
        """
        assert self.resource_id is not None
        docker_client = DockerClient.from_env()
        self._authorize_client(docker_client, self.resource_id)

        return docker_client

    def _configure_local_client(
        self,
        **kwargs: Any,
    ) -> None:
        """Configure the local Docker client to authenticate to a Docker/OCI registry.

        Args:
            kwargs: Additional implementation specific keyword arguments to use
                to configure the client.

        Raises:
            AuthorizationException: If authentication failed.
        """
        # Call the docker CLI to authenticate to the Docker registry
        cfg = self.config

        assert self.resource_id is not None
        registry = self._parse_resource_id(self.resource_id)

        docker_login_cmd = [
            "docker",
            "login",
            "-u",
            cfg.username.get_secret_value(),
            "--password-stdin",
        ]
        if registry != DOCKER_REGISTRY_NAME:
            docker_login_cmd.append(registry)

        try:
            subprocess.run(
                docker_login_cmd,
                check=True,
                input=cfg.password.get_secret_value().encode(),
            )
        except subprocess.CalledProcessError as e:
            raise AuthorizationException(
                f"Failed to authenticate to Docker registry "
                f"'{self.resource_id}': {e}"
            ) from e

    @classmethod
    def _auto_configure(
        cls,
        auth_method: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        **kwargs: Any,
    ) -> "DockerServiceConnector":
        """Auto-configure the connector.

        Not supported by the Docker connector.

        Args:
            auth_method: The particular authentication method to use. If not
                specified, the connector implementation must decide which
                authentication method to use or raise an exception.
            resource_type: The type of resource to configure.
            resource_id: The ID of the resource to configure. The
                implementation may choose to either require or ignore this
                parameter if it does not support or detect an resource type that
                supports multiple instances.
            kwargs: Additional implementation specific keyword arguments to use.

        Raises:
            NotImplementedError: If the connector auto-configuration fails or
                is not supported.
        """
        raise NotImplementedError(
            "Auto-configuration is not supported by the Docker connector."
        )

    def _verify(
        self,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> List[str]:
        """Verify that the connector can authenticate and access resources.

        Args:
            resource_type: The type of resource to verify. Must be set to the
                Docker resource type.
            resource_id: The Docker registry name or URL to connect to.

        Returns:
            The name of the Docker registry that this connector can access.
        """
        # The docker server isn't available on the ZenML server, so we can't
        # verify the credentials there.
        try:
            docker_client = DockerClient.from_env()
        except DockerException as e:
            logger.warning(
                f"Failed to connect to Docker daemon: {e}"
                f"\nSkipping Docker connector verification."
            )
        else:
            assert resource_id is not None
            self._authorize_client(docker_client, resource_id)
            docker_client.close()

        return [resource_id] if resource_id else []

service_connector

Base ZenML Service Connector class.

AuthenticationConfig (BaseModel) pydantic-model

Base authentication configuration.

Source code in zenml/service_connectors/service_connector.py
class AuthenticationConfig(BaseModel):
    """Base authentication configuration."""

    @property
    def secret_values(self) -> Dict[str, SecretStr]:
        """Get the secret values as a dictionary.

        Returns:
            A dictionary of all secret values in the configuration.
        """
        return {
            k: v
            for k, v in self.dict(exclude_none=True).items()
            if isinstance(v, SecretStr)
        }

    @property
    def non_secret_values(self) -> Dict[str, str]:
        """Get the non-secret values as a dictionary.

        Returns:
            A dictionary of all non-secret values in the configuration.
        """
        return {
            k: v
            for k, v in self.dict(exclude_none=True).items()
            if not isinstance(v, SecretStr)
        }

    @property
    def all_values(self) -> Dict[str, Any]:
        """Get all values as a dictionary.

        Returns:
            A dictionary of all values in the configuration.
        """
        return self.dict(exclude_none=True)
all_values: Dict[str, Any] property readonly

Get all values as a dictionary.

Returns:

Type Description
Dict[str, Any]

A dictionary of all values in the configuration.

non_secret_values: Dict[str, str] property readonly

Get the non-secret values as a dictionary.

Returns:

Type Description
Dict[str, str]

A dictionary of all non-secret values in the configuration.

secret_values: Dict[str, pydantic.types.SecretStr] property readonly

Get the secret values as a dictionary.

Returns:

Type Description
Dict[str, pydantic.types.SecretStr]

A dictionary of all secret values in the configuration.

ServiceConnector (BaseModel) pydantic-model

Base service connector class.

Service connectors are standalone components that can be used to link ZenML to external resources. They are responsible for validating and storing authentication configuration and sensitive credentials and for providing authentication services to other ZenML components. Service connectors are built on top of the (otherwise opaque) ZenML secrets and secrets store mechanisms and add secret auto-configuration, secret discovery and secret schema validation capabilities.

The implementation independent service connector abstraction is made possible through the use of generic "resource types" and "resource IDs". These constitute the "contract" between connectors and the consumers of the authentication services that they provide. In a nutshell, a connector instance advertises what resource(s) it can be used to gain access to, whereas a consumer may run a query to search for compatible connectors by specifying the resource(s) that they need to access and then use a matching connector instance to connect to said resource(s).

The resource types and authentication methods supported by a connector are declared in the connector type specification. The role of this specification is two-fold:

  • it declares a schema for the configuration that needs to be provided to configure the connector. This can be used to validate the configuration without having to instantiate the connector itself (e.g. in the CLI and dashboard), which also makes it possible to configure connectors and store their configuration without having to instantiate them.
  • it provides a way for ZenML to keep a registry of available connector implementations and configured connector instances. Users who want to connect ZenML to external resources via connectors can use this registry to discover what types of connectors are available and what types of resources they can be configured to access. Consumers can also use the registry to find connector instances that are compatible with the types of resources that they need to access.
Source code in zenml/service_connectors/service_connector.py
class ServiceConnector(BaseModel, metaclass=ServiceConnectorMeta):
    """Base service connector class.

    Service connectors are standalone components that can be used to link ZenML
    to external resources. They are responsible for validating and storing
    authentication configuration and sensitive credentials and for providing
    authentication services to other ZenML components. Service connectors are
    built on top of the (otherwise opaque) ZenML secrets and secrets store
    mechanisms and add secret auto-configuration, secret discovery and secret
    schema validation capabilities.

    The implementation independent service connector abstraction is made
    possible through the use of generic "resource types" and "resource IDs".
    These constitute the "contract" between connectors and the consumers of the
    authentication services that they provide. In a nutshell, a connector
    instance advertises what resource(s) it can be used to gain access to,
    whereas a consumer may run a query to search for compatible connectors by
    specifying the resource(s) that they need to access and then use a
    matching connector instance to connect to said resource(s).

    The resource types and authentication methods supported by a connector are
    declared in the connector type specification. The role of this specification
    is two-fold:

    - it declares a schema for the configuration that needs to be provided to
    configure the connector. This can be used to validate the configuration
    without having to instantiate the connector itself (e.g. in the CLI and
    dashboard), which also makes it possible to configure connectors and
    store their configuration without having to instantiate them.
    - it provides a way for ZenML to keep a registry of available connector
    implementations and configured connector instances. Users who want to
    connect ZenML to external resources via connectors can use this registry
    to discover what types of connectors are available and what types of
    resources they can be configured to access. Consumers can also use the
    registry to find connector instances that are compatible with the
    types of resources that they need to access.
    """

    id: Optional[UUID] = None
    name: Optional[str] = None
    auth_method: str
    resource_type: Optional[str] = None
    resource_id: Optional[str] = None
    expires_at: Optional[datetime] = None
    expiration_seconds: Optional[int] = None
    config: AuthenticationConfig

    _TYPE: ClassVar[Optional[ServiceConnectorTypeModel]] = None

    def __init__(self, **kwargs: Any) -> None:
        """Initialize a new service connector instance.

        Args:
            kwargs: Additional keyword arguments to pass to the base class
                constructor.
        """
        super().__init__(**kwargs)

        # Convert the resource ID to its canonical form. For resource types
        # that don't support multiple instances:
        # - if a resource ID is not provided, we use the default resource ID for
        # the resource type
        # - if a resource ID is provided, we verify that it matches the default
        # resource ID for the resource type
        if self.resource_type:
            try:
                self.resource_id = self._validate_resource_id(
                    self.resource_type, self.resource_id
                )
            except AuthorizationException as e:
                error = (
                    f"Authorization error validating resource ID "
                    f"{self.resource_id} for resource type "
                    f"{self.resource_type}: {e}"
                )
                # Log an exception if debug logging is enabled
                if logger.isEnabledFor(logging.DEBUG):
                    logger.exception(error)
                else:
                    logger.warning(error)

                self.resource_id = None

    @classmethod
    @abstractmethod
    def _get_connector_type(cls) -> ServiceConnectorTypeModel:
        """Get the connector type specification.

        Returns:
            The connector type specification.
        """

    def _canonical_resource_id(
        self, resource_type: str, resource_id: str
    ) -> str:
        """Convert a resource ID to its canonical form.

        This method is used to canonicalize the resource ID before it is
        stored in the connector instance. The default implementation simply
        returns the supplied resource ID as-is.

        Args:
            resource_type: The resource type to canonicalize.
            resource_id: The resource ID to canonicalize.

        Returns:
            The canonical resource ID.
        """
        return resource_id

    def _get_default_resource_id(self, resource_type: str) -> str:
        """Get the default resource ID for a resource type.

        Service connector implementations must override this method and provide
        a default resource ID for resources that do not support multiple
        instances.

        Args:
            resource_type: The type of the resource to get a default resource ID
                for. Only called with resource types that do not support
                multiple instances.

        Raises:
            RuntimeError: If the resource type does not support multiple
                instances and the connector implementation did not provide a
                default resource ID.
        """
        # If a resource type does not support multiple instances, raise an
        # exception; the connector implementation must override this method and
        # provide a default resource ID in this case.
        raise RuntimeError(
            f"Resource type '{resource_type}' does not support multiple "
            f"instances and the connector implementation didn't provide "
            f"a default resource ID."
        )

    @abstractmethod
    def _connect_to_resource(
        self,
        **kwargs: Any,
    ) -> Any:
        """Authenticate and connect to a resource.

        This method initializes and returns an implementation specific object
        representing an authenticated service client, connection or session that
        can be used to access the resource that the connector instance is
        configured to connect to.

        The implementation should assume that the the resource type and resource
        ID configured in the connector instance are both set, valid and ready to
        be used.

        Args:
            kwargs: Additional implementation specific keyword arguments to use
                to configure the client.

        Returns:
            An implementation specific object representing the authenticated
            service client, connection or session.

        Raises:
            AuthorizationException: If authentication failed.
        """

    @abstractmethod
    def _configure_local_client(
        self,
        **kwargs: Any,
    ) -> None:
        """Configure a local client to authenticate and connect to a resource.

        This method configures a local client or SDK installed on the localhost
        to connect to the resource that the connector instance is configured
        to access.

        The implementation should assume that the the resource type and resource
        ID configured in the connector instance are both set, valid and ready to
        be used.

        Args:
            kwargs: Additional implementation specific keyword arguments to use
                to configure the client.

        Raises:
            AuthorizationException: If authentication failed.
            NotImplementedError: If the connector instance does not support
                local configuration for the configured resource type or
                authentication method.
        """

    @classmethod
    @abstractmethod
    def _auto_configure(
        cls,
        auth_method: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        **kwargs: Any,
    ) -> "ServiceConnector":
        """Auto-configure a connector instance.

        Instantiate a connector with a configuration extracted from the
        authentication configuration available in the environment (e.g.
        environment variables or local client/SDK configuration files).

        Args:
            auth_method: The particular authentication method to use. If not
                specified, the connector implementation must decide which
                authentication method to use or raise an exception.
            resource_type: The type of resource to configure. If not specified,
                the connector implementation must return a connector instance
                configured to access any of the supported resource types or
                decide to use a default resource type or raise an exception.
            resource_id: The ID of the resource to configure. The
                implementation may choose to either require or ignore this
                parameter if it does not support or detect a resource type that
                supports multiple instances.
            kwargs: Additional implementation specific keyword arguments to use.

        Returns:
            A connector instance configured with authentication credentials
            automatically extracted from the environment.

        Raises:
            NotImplementedError: If the connector auto-configuration fails or
                is not supported.
        """

    @abstractmethod
    def _verify(
        self,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> List[str]:
        """Verify and list all the resources that the connector can access.

        This method uses the connector's configuration to verify that it can
        authenticate and access the indicated resource(s). A non-empty list of
        resource IDs in canonical format MUST be returned if the resource type
        is supplied, otherwise the connector is marked as being in an error
        state that doesn't allow it to be used to access any resources.

        The implementation should not rely on the resource type and resource
        ID configured in the connector instance because they might be
        missing or different than the ones supplied to this method (e.g. if
        this is a multi-type or multi-instance connector).

        If a resource type is not supplied, the implementation must verify that
        it can authenticate and connect to any of the supported resource types
        and should not return any resource IDs. A list of resource IDs may be
        returned in this case, but is not required.

        For resource types that do not support multiple instances, the
        resource ID parameter, if supplied, is the same as the default
        resource ID for the resource type as returned by
        `_get_default_resource_id`, in which case, the implementation should
        return it unmodified.

        For resource types that do support multiple instances, if the resource
        ID is not supplied, the implementation MUST return a non-empty list of
        resource IDs in canonical format identifying all the resources that the
        connector can access of the specified type.

        Args:
            resource_type: The type of the resource to verify. If omitted and
                if the connector supports multiple resource types, the
                implementation must verify that it can authenticate and connect
                to any and all of the supported resource types.
            resource_id: The ID of the resource to connect to. Omitted if a
                resource type is not specified. It has the same value as the
                default resource ID if the supplied resource type doesn't
                support multiple instances. If the supplied resource type does
                allows multiple instances, this parameter may still be omitted
                to fetch a list of resource IDs identifying all the resources
                of the indicated type that the connector can access.

        Returns:
            The list of resources IDs in canonical format identifying the
            resources that the connector can access. This list may be empty only
            if the resource type is not specified (i.e. for multi-type
            connectors).

        Raises:
            ValueError: If the connector configuration is invalid.
            AuthorizationException: If authentication failed.
        """

    def _get_connector_client(
        self,
        resource_type: str,
        resource_id: str,
    ) -> "ServiceConnector":
        """Get a connector instance that can be used to connect to a resource.

        This method should return a connector instance that is fully configured
        and ready to use to access the indicated resource:
        - has proper authentication configuration and credentials. These can be
        either the same as the ones configured in this connector instance or
        different ones (e.g. if this connector is able to generate temporary
        credentials for a particular resource).
        - has a resource type
        - has a resource ID

        This method is useful in cases where the connector capable of
        authenticating against the target service and accessing the configured
        resource is different than the connector configured by the user. Some
        example use-cases for this are:

        - a connector is configured with long-lived credentials or credentials
        with wide permissions but is able to generate temporary credentials or
        credentials with a narrower permissions scope for a particular resource.
        In this case, the "main connector" instance stores the long-lived
        credentials and is used to generate temporary credentials that are used
        by the "connector client" instance to authenticate against the target
        service and access the resource.

        - a connector is configured to access multiple types of resources (e.g.
        cloud providers) or multiple resource instances. In this case, the "main
        connector" instance is configured without a resource type or without a
        resource ID and is able to grant access to any resource type/resource
        ID. It can instantiate "connector clients" for a particular
        resource type and resource ID.

        - a connector is configured with a resource type but it is able to
        generate configurations for a different resource type. In this case,
        the "main connector" and the "connector client" can even belong to
        different connector implementations and the "main connector" can
        generate a "connector client" configuration of a different resource type
        that is used to access the final resource (e.g. the AWS connector can
        instantiate a Kubernetes connector client to access an EKS resource as
        if it were a generic Kubernetes cluster).

        The default implementation returns this connector instance itself, or a
        copy of it if the resource type or resource ID are different than the
        ones configured in the connector instance.

        Args:
            resource_type: The type of the resources to connect to.
            resource_id: The ID of a particular resource to connect to.

        Returns:
            A service connector client instance that can be used to connect to
            the indicated resource.
        """
        if (
            self.resource_type == resource_type
            and self.resource_id == resource_id
        ):
            return self

        copy = self.copy()
        copy.resource_type = resource_type
        copy.resource_id = resource_id
        return copy

    def _validate_resource_id(
        self, resource_type: str, resource_id: Optional[str]
    ) -> Optional[str]:
        """Validate a resource ID value of a certain type against the connector configuration.

        Args:
            resource_type: The type of resource to validate.
            resource_id: The resource ID to validate.

        Returns:
            The validated resource ID, or a default resource ID if the resource
            type supports multiple instances and one was not supplied.

        Raises:
            AuthorizationException: If the connector is not authorized to
                access the provided resource ID.
        """
        # Fetch the resource type specification
        resource_type_spec = self.type.resource_type_dict[resource_type]
        # If the resource type supports multiple instances, return the supplied
        # resource converted to canonical format, if supplied
        if resource_type_spec.supports_instances:
            if resource_id is None:
                return None
            return self._canonical_resource_id(resource_type, resource_id)

        # If the resource type does not support multiple instances, the
        # connector implementation must provide a default resource ID based
        # on the connector configuration
        default_resource_id = self._get_default_resource_id(resource_type)

        if resource_id is None:
            return default_resource_id

        resource_id = self._canonical_resource_id(resource_type, resource_id)
        if resource_id != default_resource_id:
            raise AuthorizationException(
                f"The connector does not allow access to the provided "
                f"'{resource_id}' {resource_type_spec.name} resource. It "
                f"only allows access to the following resource: "
                f"'{default_resource_id}'."
            )

        return resource_id

    @classmethod
    def get_type(cls) -> ServiceConnectorTypeModel:
        """Get the connector type specification.

        Returns:
            The connector type specification.
        """
        if cls._TYPE is not None:
            return cls._TYPE

        connector_type = cls._get_connector_type()
        connector_type.set_connector_class(cls)
        cls._TYPE = connector_type
        return cls._TYPE

    @property
    def type(self) -> ServiceConnectorTypeModel:
        """Get the connector type specification.

        Returns:
            The connector type specification.
        """
        return self.get_type()

    @property
    def supported_resource_types(self) -> List[str]:
        """The resource types supported by this connector instance.

        Returns:
            A list with the resource types supported by this connector instance.
        """
        spec = self.get_type()

        # An unset resource type means that the connector is configured to
        # access any of the supported resource types (a multi-type connector).
        # In that case, we report all the supported resource types to
        # allow it to be discovered as compatible with any of them.
        if self.resource_type:
            resource_types = [self.resource_type]
        else:
            resource_types = list(spec.resource_type_dict.keys())

        return resource_types

    @classmethod
    def from_model(
        cls, model: "ServiceConnectorBaseModel"
    ) -> "ServiceConnector":
        """Creates a service connector instance from a service connector model.

        Args:
            model: The service connector model.

        Returns:
            The created service connector instance.

        Raises:
            ValueError: If the connector configuration is invalid.
        """
        # Validate the connector configuration model
        spec = cls.get_type()

        # Multiple resource types in the model means that the connector
        # instance is configured to access any of the supported resource
        # types (a multi-type connector). We represent that here by setting the
        # resource type to None.
        resource_type: Optional[str] = None
        if len(model.resource_types) == 1:
            resource_type = model.resource_types[0]

        expiration_seconds: Optional[int] = None
        try:
            method_spec, _ = spec.find_resource_specifications(
                model.auth_method,
                resource_type,
            )
            expiration_seconds = method_spec.validate_expiration(
                model.expiration_seconds
            )
        except (KeyError, ValueError) as e:
            raise ValueError(
                f"connector configuration is not valid: {e}"
            ) from e

        # Unpack the authentication configuration
        config = model.configuration.copy()
        if (
            isinstance(model, ServiceConnectorResponseModel)
            and model.secret_id
        ):
            try:
                secret = Client().get_secret(model.secret_id)
            except KeyError as e:
                raise ValueError(
                    f"could not fetch secret with ID '{model.secret_id}' "
                    f"referenced in the connector configuration: {e}"
                ) from e

            if secret.has_missing_values:
                raise ValueError(
                    f"secret with ID '{model.secret_id}' referenced in the "
                    "connector configuration has missing values. This can "
                    "happen for example if your user lacks the permissions "
                    "required to access the secret."
                )

            config.update(secret.secret_values)

        if model.secrets:
            config.update(
                {
                    k: v.get_secret_value()
                    for k, v in model.secrets.items()
                    if v
                }
            )

        if method_spec.config_class is None:
            raise ValueError(
                f"the implementation of the {model.name} connector type is "
                "not available in the environment. Please check that you "
                "have installed the required dependencies."
            )

        # Validate the authentication configuration
        try:
            auth_config = method_spec.config_class(**config)
        except ValidationError as e:
            raise ValueError(
                f"connector configuration is not valid: {e}"
            ) from e

        assert isinstance(auth_config, AuthenticationConfig)

        connector = cls(
            auth_method=model.auth_method,
            resource_type=resource_type,
            resource_id=model.resource_id,
            config=auth_config,
            expires_at=model.expires_at,
            expiration_seconds=expiration_seconds,
        )
        if isinstance(model, ServiceConnectorResponseModel):
            connector.id = model.id
            connector.name = model.name

        return connector

    def to_model(
        self,
        user: UUID,
        workspace: UUID,
        name: Optional[str] = None,
        is_shared: bool = False,
        description: str = "",
        labels: Optional[Dict[str, str]] = None,
    ) -> "ServiceConnectorRequestModel":
        """Convert the connector instance to a service connector model.

        Args:
            name: The name of the connector.
            user: The ID of the user that created the connector.
            workspace: The ID of the workspace that the connector belongs to.
            is_shared: Whether the connector is shared with other users.
            description: The description of the connector.
            labels: The labels of the connector.

        Returns:
            The service connector model corresponding to the connector
            instance.

        Raises:
            ValueError: If the connector configuration is not valid.
        """
        spec = self.get_type()

        name = name or self.name
        if name is None:
            raise ValueError(
                "connector configuration is not valid: name must be set"
            )

        model = ServiceConnectorRequestModel(
            connector_type=spec.connector_type,
            name=name,
            description=description,
            user=user,
            workspace=workspace,
            is_shared=is_shared,
            auth_method=self.auth_method,
            expires_at=self.expires_at,
            expiration_seconds=self.expiration_seconds,
            labels=labels or {},
        )

        # Validate the connector configuration.
        model.validate_and_configure_resources(
            connector_type=spec,
            resource_types=self.resource_type,
            resource_id=self.resource_id,
            configuration=self.config.non_secret_values,
            secrets=self.config.secret_values,  # type: ignore[arg-type]
        )

        return model

    def to_response_model(
        self,
        workspace: WorkspaceResponseModel,
        user: Optional[UserResponseModel] = None,
        name: Optional[str] = None,
        id: Optional[UUID] = None,
        is_shared: bool = False,
        description: str = "",
        labels: Optional[Dict[str, str]] = None,
    ) -> "ServiceConnectorResponseModel":
        """Convert the connector instance to a service connector response model.

        Args:
            workspace: The workspace that the connector belongs to.
            user: The user that created the connector.
            name: The name of the connector.
            id: The ID of the connector.
            is_shared: Whether the connector is shared with other users.
            description: The description of the connector.
            labels: The labels of the connector.

        Returns:
            The service connector response model corresponding to the connector
            instance.

        Raises:
            ValueError: If the connector configuration is not valid.
        """
        spec = self.get_type()

        name = name or self.name
        id = id or self.id
        if name is None or id is None:
            raise ValueError(
                "connector configuration is not valid: name and ID must be set"
            )

        model = ServiceConnectorResponseModel(
            id=id,
            created=datetime.utcnow(),
            updated=datetime.utcnow(),
            connector_type=self.get_type(),
            name=name,
            description=description,
            user=user,
            workspace=workspace,
            is_shared=is_shared,
            auth_method=self.auth_method,
            expires_at=self.expires_at,
            expiration_seconds=self.expiration_seconds,
            labels=labels or {},
        )

        # Validate the connector configuration.
        model.validate_and_configure_resources(
            connector_type=spec,
            resource_types=self.resource_type,
            resource_id=self.resource_id,
            configuration=self.config.non_secret_values,
            secrets=self.config.secret_values,  # type: ignore[arg-type]
        )

        return model

    def has_expired(self) -> bool:
        """Check if the connector authentication credentials have expired.

        Verify that the authentication credentials associated with the connector
        have not expired by checking the expiration time against the current
        time.

        Returns:
            True if the connector has expired, False otherwise.
        """
        if not self.expires_at:
            return False

        expires_at = self.expires_at.replace(tzinfo=timezone.utc)
        return expires_at < datetime.now(timezone.utc)

    def validate_runtime_args(
        self,
        resource_type: Optional[str],
        resource_id: Optional[str] = None,
        require_resource_type: bool = False,
        require_resource_id: bool = False,
        **kwargs: Any,
    ) -> Tuple[Optional[str], Optional[str]]:
        """Validate the runtime arguments against the connector configuration.

        Validate that the supplied runtime arguments are compatible with the
        connector configuration and its specification. This includes validating
        that the resource type and resource ID are compatible with the connector
        configuration and its capabilities.

        Args:
            resource_type: The type of the resource supplied at runtime by the
                connector's consumer. Must be the same as the resource type that
                the connector is configured to access, unless the connector is
                configured to access any resource type.
            resource_id: The ID of the resource requested by the connector's
                consumer. Can be different than the resource ID that the
                connector is configured to access, e.g. if it is not in the
                canonical form.
            require_resource_type: Whether the resource type is required.
            require_resource_id: Whether the resource ID is required.
            kwargs: Additional runtime arguments.

        Returns:
            The validated resource type and resource ID.

        Raises:
            ValueError: If the runtime arguments are not valid.
        """
        if (
            self.resource_type
            and resource_type
            and (self.resource_type != resource_type)
        ):
            raise ValueError(
                f"the connector is configured to provide access to a "
                f"'{self.resource_type}' resource type, but a different "
                f"resource type was requested: '{resource_type}'."
            )

        resource_type = resource_type or self.resource_type
        resource_id = resource_id or self.resource_id

        if require_resource_type and not resource_type:
            raise ValueError(
                "the connector is configured to provide access to multiple "
                "resource types. A resource type must be specified when "
                "requesting access to a resource."
            )

        spec = self.get_type()

        try:
            # Get the resource specification corresponding to the
            # connector configuration.
            _, resource_spec = spec.find_resource_specifications(
                self.auth_method,
                resource_type,
            )
        except (KeyError, ValueError) as e:
            raise ValueError(
                f"connector configuration is not valid: {e}"
            ) from e

        if not resource_type or not resource_spec:
            if resource_id:
                raise ValueError(
                    "the connector is configured to provide access to multiple "
                    "resource types, but only a resource name was specified. A "
                    "resource type must also be specified when "
                    "requesting access to a resource."
                )

            return resource_type, resource_id

        # Validate and convert the resource ID to its canonical form.
        # A default resource ID is returned for resource types that do not
        # support instances, if no resource ID is specified.
        resource_id = self._validate_resource_id(
            resource_type=resource_type,
            resource_id=resource_id,
        )

        if resource_id:
            if self.resource_id and self.resource_id != resource_id:
                raise ValueError(
                    f"the connector is configured to provide access to a "
                    f"single {resource_spec.name} resource with a "
                    f"resource name of '{self.resource_id}', but a "
                    f"different resource name was requested: "
                    f"'{resource_id}'."
                )

        else:
            if not self.resource_id and require_resource_id:
                raise ValueError(
                    f"the connector is configured to provide access to "
                    f"multiple {resource_spec.name} resources. A resource name "
                    "must be specified when requesting access to a resource."
                )

        return resource_type, resource_id

    def connect(
        self,
        **kwargs: Any,
    ) -> Any:
        """Authenticate and connect to a resource.

        Initialize and return an implementation specific object representing an
        authenticated service client, connection or session that can be used
        to access the resource that the connector is configured to access.

        The connector has to be fully configured for this method to succeed
        (i.e. the connector's configuration must be valid, a resource type and
        a resource ID must be configured). This method should only be called on
        a connector client retrieved by calling `get_connector_client` on the
        main service connector.

        Args:
            kwargs: Additional implementation specific keyword arguments to use
                to configure the client.

        Returns:
            An implementation specific object representing the authenticated
            service client, connection or session.

        Raises:
            AuthorizationException: If the connector's authentication
                credentials have expired.
        """
        resource_type, resource_id = self.validate_runtime_args(
            resource_type=self.resource_type,
            resource_id=self.resource_id,
            require_resource_type=True,
            require_resource_id=True,
        )

        if self.has_expired():
            raise AuthorizationException(
                "the connector's authentication credentials have expired."
            )

        self._verify(
            resource_type=resource_type,
            resource_id=resource_id,
        )

        return self._connect_to_resource(
            **kwargs,
        )

    @classmethod
    def auto_configure(
        cls,
        auth_method: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        **kwargs: Any,
    ) -> Optional["ServiceConnector"]:
        """Auto-configure a connector instance.

        Instantiate a connector with a configuration extracted from the
        authentication configuration available in the environment (e.g.
        environment variables or local client/SDK configuration files).

        Args:
            auth_method: The particular authentication method to use. If
                omitted and if the connector implementation cannot decide which
                authentication method to use, it may raise an exception.
            resource_type: The type of resource to configure. If not specified,
                the method returns a connector instance configured to access any
                of the supported resource types (multi-type connector) or
                configured to use a default resource type. If the connector
                doesn't support multi-type configurations or if it cannot decide
                which resource type to use, it may raise an exception.
            resource_id: The ID of the resource instance to configure. The
                connector implementation may choose to either require or ignore
                this parameter if it does not support or detect a resource type
                that supports multiple instances.
            kwargs: Additional implementation specific keyword arguments to use.

        Returns:
            A connector instance configured with authentication credentials
            automatically extracted from the environment or None if
            auto-configuration is not supported.

        Raises:
            ValueError: If the connector does not support the requested
                authentication method or resource type.
            AuthorizationException: If the connector's authentication
                credentials have expired.
        """
        spec = cls.get_type()

        if not spec.supports_auto_configuration:
            return None

        if auth_method and auth_method not in spec.auth_method_dict:
            raise ValueError(
                f"connector type {spec.name} does not support authentication "
                f"method: '{auth_method}'"
            )

        if resource_type and resource_type not in spec.resource_type_dict:
            raise ValueError(
                f"connector type {spec.name} does not support resource type: "
                f"'{resource_type}'"
            )

        connector = cls._auto_configure(
            auth_method=auth_method,
            resource_type=resource_type,
            resource_id=resource_id,
            **kwargs,
        )

        if connector.has_expired():
            raise AuthorizationException(
                "the connector's auto-configured authentication credentials "
                "have expired."
            )

        connector._verify(
            resource_type=connector.resource_type,
            resource_id=connector.resource_id,
        )
        return connector

    def configure_local_client(
        self,
        **kwargs: Any,
    ) -> None:
        """Configure a local client to authenticate and connect to a resource.

        This method uses the connector's configuration to configure a local
        client or SDK installed on the localhost so that it can authenticate
        and connect to the resource that the connector is configured to access.

        The connector has to be fully configured for this method to succeed
        (i.e. the connector's configuration must be valid, a resource type must
        be specified and the resource ID must be specified if the resource type
        supports multiple instances). This method should only be called on a
        connector client retrieved by calling `get_connector_client` on the
        main service connector.

        Args:
            kwargs: Additional implementation specific keyword arguments to use
                to configure the client.

        Raises:
            AuthorizationException: If the connector's authentication
                credentials have expired.
        """
        resource_type, resource_id = self.validate_runtime_args(
            resource_type=self.resource_type,
            resource_id=self.resource_id,
            require_resource_type=True,
            require_resource_id=True,
        )

        if self.has_expired():
            raise AuthorizationException(
                "the connector's authentication credentials have expired."
            )

        self._verify(
            resource_type=resource_type,
            resource_id=resource_id,
        )

        self._configure_local_client(
            **kwargs,
        )

    def verify(
        self,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
        list_resources: bool = True,
    ) -> ServiceConnectorResourcesModel:
        """Verify and optionally list all the resources that the connector can access.

        This method uses the connector's configuration to verify that it can
        authenticate and access the indicated resource(s).

        If `list_resources` is set, the list of resources that the connector can
        access, scoped to the supplied resource type and resource ID is included
        in the response, otherwise the connector only verifies that it can
        globally authenticate and doesn't verify or return resource information
        (i.e. the `resource_ids` fields in the response are empty).

        Args:
            resource_type: The type of the resource to verify. If the connector
                instance is already configured with a resource type, this
                argument must be the same as the one configured if supplied.
            resource_id: The ID of a particular resource instance to check
                whether the connector can access. If the connector instance is
                already configured with a resource ID that is not the same or
                equivalent to the one requested, a `ValueError` exception is
                raised.
            list_resources: Whether to list the resources that the connector can
                access.

        Returns:
            A list of resources that the connector can access.

        Raises:
            ValueError: If the arguments or the connector configuration are
                not valid.
        """
        spec = self.get_type()

        resources = ServiceConnectorResourcesModel(
            connector_type=spec,
            id=self.id,
            name=self.name,
        )

        name_msg = f" '{self.name}'" if self.name else ""

        resource_types = self.supported_resource_types
        if resource_type:
            if resource_type not in resource_types:
                raise ValueError(
                    f"connector{name_msg} does not support resource type: "
                    f"'{resource_type}'. Supported resource types are: "
                    f"{', '.join(resource_types)}"
                )
            resource_types = [resource_type]

        # Pre-populate the list of resources with entries corresponding to the
        # supported resource types and scoped to the supplied resource type
        resources.resources = [
            ServiceConnectorTypedResourcesModel(
                resource_type=rt,
            )
            for rt in resource_types
        ]

        if self.has_expired():
            error = "the connector's authentication credentials have expired."
            # Log the error in the resources object
            resources.set_error(error)
            return resources

        verify_resource_types: List[Optional[str]] = []
        verify_resource_id = None
        if not list_resources and not resource_id:
            # If we're not listing resources, we're only verifying that the
            # connector can authenticate globally (i.e. without supplying a
            # resource type or resource ID). This is the same as if no resource
            # type or resource ID was supplied. The only exception is when the
            # verification scope is already set to a single resource ID, in
            # which case we still perform the verification on that resource ID.
            verify_resource_types = [None]
            verify_resource_id = None
        else:
            if len(resource_types) > 1:
                # This forces the connector to verify that it can authenticate
                # globally (i.e. without supplying a resource type or resource
                # ID) before listing individual resources in the case of
                # multi-type service connectors.
                verify_resource_types = [None]

            verify_resource_types.extend(resource_types)
            if len(verify_resource_types) == 1:
                verify_resource_id = resource_id

        # Here we go through each resource type and attempt to verify and list
        # the resources that the connector can access for that resource type.
        # This list may start with a `None` resource type, which indicates that
        # the connector should verify that it can authenticate globally.
        for resource_type in verify_resource_types:
            try:
                resource_type, resource_id = self.validate_runtime_args(
                    resource_type=resource_type,
                    resource_id=verify_resource_id,
                    require_resource_type=False,
                    require_resource_id=False,
                )

                resource_ids = self._verify(
                    resource_type=resource_type,
                    resource_id=resource_id,
                )
            except ValueError as exc:
                raise ValueError(
                    f"The connector configuration is incomplete or invalid: "
                    f"{exc}",
                )
            except AuthorizationException as exc:
                error = f"connector{name_msg} authorization failure: {exc}"
                # Log an exception if debug logging is enabled
                if logger.isEnabledFor(logging.DEBUG):
                    logger.exception(error)
                else:
                    logger.warning(error)

                # Log the error in the resources object
                resources.set_error(error, resource_type=resource_type)
                if resource_type:
                    continue
                else:
                    # We stop on a global failure
                    break
            except Exception as exc:
                error = (
                    f"connector{name_msg} verification failed with "
                    f"unexpected error: {exc}"
                )
                # Log an exception if debug logging is enabled
                if logger.isEnabledFor(logging.DEBUG):
                    logger.exception(error)
                else:
                    logger.warning(error)
                error = (
                    "an unexpected error occurred while verifying the "
                    "connector."
                )
                # Log the error in the resources object
                resources.set_error(error, resource_type=resource_type)
                if resource_type:
                    continue
                else:
                    # We stop on a global failure
                    break

            if not resource_type:
                # If a resource type is not provided as argument, we don't
                # expect any resources to be listed
                continue

            resource_type_spec = spec.resource_type_dict[resource_type]

            if resource_id:
                # A single resource was requested, so we expect a single
                # resource to be listed
                if [resource_id] != resource_ids:
                    logger.error(
                        f"a different resource ID '{resource_ids}' was "
                        f"returned than the one requested: {resource_ids}. "
                        f"This is likely a bug in the {self.__class__} "
                        "connector implementation."
                    )
                resources.set_resource_ids(resource_type, [resource_id])
            elif not resource_ids:
                # If no resources were listed, signal this as an error that the
                # connector cannot access any resources.
                error = (
                    f"connector{name_msg} didn't list any "
                    f"{resource_type_spec.name} resources. This is likely "
                    "caused by the connector credentials not being valid or "
                    "not having sufficient permissions to list or access "
                    "resources of this type. Please check the connector "
                    "configuration and its credentials and try again."
                )
                logger.debug(error)
                resources.set_error(error, resource_type=resource_type)
            else:
                resources.set_resource_ids(resource_type, resource_ids)

        return resources

    def get_connector_client(
        self,
        resource_type: Optional[str] = None,
        resource_id: Optional[str] = None,
    ) -> "ServiceConnector":
        """Get a connector client that can be used to connect to a resource.

        The connector client can be used by consumers to connect to a resource
        (i.e. make calls to `connect` and `configure_local_client`).

        The returned connector may be the same as the original connector
        or it may a different instance configured with different credentials or
        even of a different connector type.

        Args:
            resource_type: The type of the resource to connect to.
            resource_id: The ID of a particular resource to connect to.

        Returns:
            A service connector client that can be used to connect to the
            resource.

        Raises:
            AuthorizationException: If authentication failed.
        """
        resource_type, resource_id = self.validate_runtime_args(
            resource_type=resource_type,
            resource_id=resource_id,
            require_resource_type=True,
            require_resource_id=True,
        )

        if self.has_expired():
            raise AuthorizationException(
                "the connector's authentication credentials have expired."
            )

        # Verify if the connector allows access to the requested resource type
        # and instance.
        self._verify(
            resource_type=resource_type,
            resource_id=resource_id,
        )

        assert resource_type is not None
        assert resource_id is not None

        connector_client = self._get_connector_client(
            resource_type=resource_type,
            resource_id=resource_id,
        )

        if connector_client.has_expired():
            raise AuthorizationException(
                "the connector's authentication credentials have expired."
            )

        connector_client._verify(
            resource_type=resource_type,
            resource_id=connector_client.resource_id,
        )

        return connector_client

    class Config:
        """Connector configuration."""

        # all attributes with leading underscore are private and therefore
        # are mutable and not included in serialization
        underscore_attrs_are_private = True
supported_resource_types: List[str] property readonly

The resource types supported by this connector instance.

Returns:

Type Description
List[str]

A list with the resource types supported by this connector instance.

type: ServiceConnectorTypeModel property readonly

Get the connector type specification.

Returns:

Type Description
ServiceConnectorTypeModel

The connector type specification.

Config

Connector configuration.

Source code in zenml/service_connectors/service_connector.py
class Config:
    """Connector configuration."""

    # all attributes with leading underscore are private and therefore
    # are mutable and not included in serialization
    underscore_attrs_are_private = True
__init__(self, **kwargs) special

Initialize a new service connector instance.

Parameters:

Name Type Description Default
kwargs Any

Additional keyword arguments to pass to the base class constructor.

{}
Source code in zenml/service_connectors/service_connector.py
def __init__(self, **kwargs: Any) -> None:
    """Initialize a new service connector instance.

    Args:
        kwargs: Additional keyword arguments to pass to the base class
            constructor.
    """
    super().__init__(**kwargs)

    # Convert the resource ID to its canonical form. For resource types
    # that don't support multiple instances:
    # - if a resource ID is not provided, we use the default resource ID for
    # the resource type
    # - if a resource ID is provided, we verify that it matches the default
    # resource ID for the resource type
    if self.resource_type:
        try:
            self.resource_id = self._validate_resource_id(
                self.resource_type, self.resource_id
            )
        except AuthorizationException as e:
            error = (
                f"Authorization error validating resource ID "
                f"{self.resource_id} for resource type "
                f"{self.resource_type}: {e}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            self.resource_id = None
auto_configure(auth_method=None, resource_type=None, resource_id=None, **kwargs) classmethod

Auto-configure a connector instance.

Instantiate a connector with a configuration extracted from the authentication configuration available in the environment (e.g. environment variables or local client/SDK configuration files).

Parameters:

Name Type Description Default
auth_method Optional[str]

The particular authentication method to use. If omitted and if the connector implementation cannot decide which authentication method to use, it may raise an exception.

None
resource_type Optional[str]

The type of resource to configure. If not specified, the method returns a connector instance configured to access any of the supported resource types (multi-type connector) or configured to use a default resource type. If the connector doesn't support multi-type configurations or if it cannot decide which resource type to use, it may raise an exception.

None
resource_id Optional[str]

The ID of the resource instance to configure. The connector implementation may choose to either require or ignore this parameter if it does not support or detect a resource type that supports multiple instances.

None
kwargs Any

Additional implementation specific keyword arguments to use.

{}

Returns:

Type Description
Optional[ServiceConnector]

A connector instance configured with authentication credentials automatically extracted from the environment or None if auto-configuration is not supported.

Exceptions:

Type Description
ValueError

If the connector does not support the requested authentication method or resource type.

AuthorizationException

If the connector's authentication credentials have expired.

Source code in zenml/service_connectors/service_connector.py
@classmethod
def auto_configure(
    cls,
    auth_method: Optional[str] = None,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
    **kwargs: Any,
) -> Optional["ServiceConnector"]:
    """Auto-configure a connector instance.

    Instantiate a connector with a configuration extracted from the
    authentication configuration available in the environment (e.g.
    environment variables or local client/SDK configuration files).

    Args:
        auth_method: The particular authentication method to use. If
            omitted and if the connector implementation cannot decide which
            authentication method to use, it may raise an exception.
        resource_type: The type of resource to configure. If not specified,
            the method returns a connector instance configured to access any
            of the supported resource types (multi-type connector) or
            configured to use a default resource type. If the connector
            doesn't support multi-type configurations or if it cannot decide
            which resource type to use, it may raise an exception.
        resource_id: The ID of the resource instance to configure. The
            connector implementation may choose to either require or ignore
            this parameter if it does not support or detect a resource type
            that supports multiple instances.
        kwargs: Additional implementation specific keyword arguments to use.

    Returns:
        A connector instance configured with authentication credentials
        automatically extracted from the environment or None if
        auto-configuration is not supported.

    Raises:
        ValueError: If the connector does not support the requested
            authentication method or resource type.
        AuthorizationException: If the connector's authentication
            credentials have expired.
    """
    spec = cls.get_type()

    if not spec.supports_auto_configuration:
        return None

    if auth_method and auth_method not in spec.auth_method_dict:
        raise ValueError(
            f"connector type {spec.name} does not support authentication "
            f"method: '{auth_method}'"
        )

    if resource_type and resource_type not in spec.resource_type_dict:
        raise ValueError(
            f"connector type {spec.name} does not support resource type: "
            f"'{resource_type}'"
        )

    connector = cls._auto_configure(
        auth_method=auth_method,
        resource_type=resource_type,
        resource_id=resource_id,
        **kwargs,
    )

    if connector.has_expired():
        raise AuthorizationException(
            "the connector's auto-configured authentication credentials "
            "have expired."
        )

    connector._verify(
        resource_type=connector.resource_type,
        resource_id=connector.resource_id,
    )
    return connector
configure_local_client(self, **kwargs)

Configure a local client to authenticate and connect to a resource.

This method uses the connector's configuration to configure a local client or SDK installed on the localhost so that it can authenticate and connect to the resource that the connector is configured to access.

The connector has to be fully configured for this method to succeed (i.e. the connector's configuration must be valid, a resource type must be specified and the resource ID must be specified if the resource type supports multiple instances). This method should only be called on a connector client retrieved by calling get_connector_client on the main service connector.

Parameters:

Name Type Description Default
kwargs Any

Additional implementation specific keyword arguments to use to configure the client.

{}

Exceptions:

Type Description
AuthorizationException

If the connector's authentication credentials have expired.

Source code in zenml/service_connectors/service_connector.py
def configure_local_client(
    self,
    **kwargs: Any,
) -> None:
    """Configure a local client to authenticate and connect to a resource.

    This method uses the connector's configuration to configure a local
    client or SDK installed on the localhost so that it can authenticate
    and connect to the resource that the connector is configured to access.

    The connector has to be fully configured for this method to succeed
    (i.e. the connector's configuration must be valid, a resource type must
    be specified and the resource ID must be specified if the resource type
    supports multiple instances). This method should only be called on a
    connector client retrieved by calling `get_connector_client` on the
    main service connector.

    Args:
        kwargs: Additional implementation specific keyword arguments to use
            to configure the client.

    Raises:
        AuthorizationException: If the connector's authentication
            credentials have expired.
    """
    resource_type, resource_id = self.validate_runtime_args(
        resource_type=self.resource_type,
        resource_id=self.resource_id,
        require_resource_type=True,
        require_resource_id=True,
    )

    if self.has_expired():
        raise AuthorizationException(
            "the connector's authentication credentials have expired."
        )

    self._verify(
        resource_type=resource_type,
        resource_id=resource_id,
    )

    self._configure_local_client(
        **kwargs,
    )
connect(self, **kwargs)

Authenticate and connect to a resource.

Initialize and return an implementation specific object representing an authenticated service client, connection or session that can be used to access the resource that the connector is configured to access.

The connector has to be fully configured for this method to succeed (i.e. the connector's configuration must be valid, a resource type and a resource ID must be configured). This method should only be called on a connector client retrieved by calling get_connector_client on the main service connector.

Parameters:

Name Type Description Default
kwargs Any

Additional implementation specific keyword arguments to use to configure the client.

{}

Returns:

Type Description
Any

An implementation specific object representing the authenticated service client, connection or session.

Exceptions:

Type Description
AuthorizationException

If the connector's authentication credentials have expired.

Source code in zenml/service_connectors/service_connector.py
def connect(
    self,
    **kwargs: Any,
) -> Any:
    """Authenticate and connect to a resource.

    Initialize and return an implementation specific object representing an
    authenticated service client, connection or session that can be used
    to access the resource that the connector is configured to access.

    The connector has to be fully configured for this method to succeed
    (i.e. the connector's configuration must be valid, a resource type and
    a resource ID must be configured). This method should only be called on
    a connector client retrieved by calling `get_connector_client` on the
    main service connector.

    Args:
        kwargs: Additional implementation specific keyword arguments to use
            to configure the client.

    Returns:
        An implementation specific object representing the authenticated
        service client, connection or session.

    Raises:
        AuthorizationException: If the connector's authentication
            credentials have expired.
    """
    resource_type, resource_id = self.validate_runtime_args(
        resource_type=self.resource_type,
        resource_id=self.resource_id,
        require_resource_type=True,
        require_resource_id=True,
    )

    if self.has_expired():
        raise AuthorizationException(
            "the connector's authentication credentials have expired."
        )

    self._verify(
        resource_type=resource_type,
        resource_id=resource_id,
    )

    return self._connect_to_resource(
        **kwargs,
    )
from_model(model) classmethod

Creates a service connector instance from a service connector model.

Parameters:

Name Type Description Default
model ServiceConnectorBaseModel

The service connector model.

required

Returns:

Type Description
ServiceConnector

The created service connector instance.

Exceptions:

Type Description
ValueError

If the connector configuration is invalid.

Source code in zenml/service_connectors/service_connector.py
@classmethod
def from_model(
    cls, model: "ServiceConnectorBaseModel"
) -> "ServiceConnector":
    """Creates a service connector instance from a service connector model.

    Args:
        model: The service connector model.

    Returns:
        The created service connector instance.

    Raises:
        ValueError: If the connector configuration is invalid.
    """
    # Validate the connector configuration model
    spec = cls.get_type()

    # Multiple resource types in the model means that the connector
    # instance is configured to access any of the supported resource
    # types (a multi-type connector). We represent that here by setting the
    # resource type to None.
    resource_type: Optional[str] = None
    if len(model.resource_types) == 1:
        resource_type = model.resource_types[0]

    expiration_seconds: Optional[int] = None
    try:
        method_spec, _ = spec.find_resource_specifications(
            model.auth_method,
            resource_type,
        )
        expiration_seconds = method_spec.validate_expiration(
            model.expiration_seconds
        )
    except (KeyError, ValueError) as e:
        raise ValueError(
            f"connector configuration is not valid: {e}"
        ) from e

    # Unpack the authentication configuration
    config = model.configuration.copy()
    if (
        isinstance(model, ServiceConnectorResponseModel)
        and model.secret_id
    ):
        try:
            secret = Client().get_secret(model.secret_id)
        except KeyError as e:
            raise ValueError(
                f"could not fetch secret with ID '{model.secret_id}' "
                f"referenced in the connector configuration: {e}"
            ) from e

        if secret.has_missing_values:
            raise ValueError(
                f"secret with ID '{model.secret_id}' referenced in the "
                "connector configuration has missing values. This can "
                "happen for example if your user lacks the permissions "
                "required to access the secret."
            )

        config.update(secret.secret_values)

    if model.secrets:
        config.update(
            {
                k: v.get_secret_value()
                for k, v in model.secrets.items()
                if v
            }
        )

    if method_spec.config_class is None:
        raise ValueError(
            f"the implementation of the {model.name} connector type is "
            "not available in the environment. Please check that you "
            "have installed the required dependencies."
        )

    # Validate the authentication configuration
    try:
        auth_config = method_spec.config_class(**config)
    except ValidationError as e:
        raise ValueError(
            f"connector configuration is not valid: {e}"
        ) from e

    assert isinstance(auth_config, AuthenticationConfig)

    connector = cls(
        auth_method=model.auth_method,
        resource_type=resource_type,
        resource_id=model.resource_id,
        config=auth_config,
        expires_at=model.expires_at,
        expiration_seconds=expiration_seconds,
    )
    if isinstance(model, ServiceConnectorResponseModel):
        connector.id = model.id
        connector.name = model.name

    return connector
get_connector_client(self, resource_type=None, resource_id=None)

Get a connector client that can be used to connect to a resource.

The connector client can be used by consumers to connect to a resource (i.e. make calls to connect and configure_local_client).

The returned connector may be the same as the original connector or it may a different instance configured with different credentials or even of a different connector type.

Parameters:

Name Type Description Default
resource_type Optional[str]

The type of the resource to connect to.

None
resource_id Optional[str]

The ID of a particular resource to connect to.

None

Returns:

Type Description
ServiceConnector

A service connector client that can be used to connect to the resource.

Exceptions:

Type Description
AuthorizationException

If authentication failed.

Source code in zenml/service_connectors/service_connector.py
def get_connector_client(
    self,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
) -> "ServiceConnector":
    """Get a connector client that can be used to connect to a resource.

    The connector client can be used by consumers to connect to a resource
    (i.e. make calls to `connect` and `configure_local_client`).

    The returned connector may be the same as the original connector
    or it may a different instance configured with different credentials or
    even of a different connector type.

    Args:
        resource_type: The type of the resource to connect to.
        resource_id: The ID of a particular resource to connect to.

    Returns:
        A service connector client that can be used to connect to the
        resource.

    Raises:
        AuthorizationException: If authentication failed.
    """
    resource_type, resource_id = self.validate_runtime_args(
        resource_type=resource_type,
        resource_id=resource_id,
        require_resource_type=True,
        require_resource_id=True,
    )

    if self.has_expired():
        raise AuthorizationException(
            "the connector's authentication credentials have expired."
        )

    # Verify if the connector allows access to the requested resource type
    # and instance.
    self._verify(
        resource_type=resource_type,
        resource_id=resource_id,
    )

    assert resource_type is not None
    assert resource_id is not None

    connector_client = self._get_connector_client(
        resource_type=resource_type,
        resource_id=resource_id,
    )

    if connector_client.has_expired():
        raise AuthorizationException(
            "the connector's authentication credentials have expired."
        )

    connector_client._verify(
        resource_type=resource_type,
        resource_id=connector_client.resource_id,
    )

    return connector_client
get_type() classmethod

Get the connector type specification.

Returns:

Type Description
ServiceConnectorTypeModel

The connector type specification.

Source code in zenml/service_connectors/service_connector.py
@classmethod
def get_type(cls) -> ServiceConnectorTypeModel:
    """Get the connector type specification.

    Returns:
        The connector type specification.
    """
    if cls._TYPE is not None:
        return cls._TYPE

    connector_type = cls._get_connector_type()
    connector_type.set_connector_class(cls)
    cls._TYPE = connector_type
    return cls._TYPE
has_expired(self)

Check if the connector authentication credentials have expired.

Verify that the authentication credentials associated with the connector have not expired by checking the expiration time against the current time.

Returns:

Type Description
bool

True if the connector has expired, False otherwise.

Source code in zenml/service_connectors/service_connector.py
def has_expired(self) -> bool:
    """Check if the connector authentication credentials have expired.

    Verify that the authentication credentials associated with the connector
    have not expired by checking the expiration time against the current
    time.

    Returns:
        True if the connector has expired, False otherwise.
    """
    if not self.expires_at:
        return False

    expires_at = self.expires_at.replace(tzinfo=timezone.utc)
    return expires_at < datetime.now(timezone.utc)
to_model(self, user, workspace, name=None, is_shared=False, description='', labels=None)

Convert the connector instance to a service connector model.

Parameters:

Name Type Description Default
name Optional[str]

The name of the connector.

None
user UUID

The ID of the user that created the connector.

required
workspace UUID

The ID of the workspace that the connector belongs to.

required
is_shared bool

Whether the connector is shared with other users.

False
description str

The description of the connector.

''
labels Optional[Dict[str, str]]

The labels of the connector.

None

Returns:

Type Description
ServiceConnectorRequestModel

The service connector model corresponding to the connector instance.

Exceptions:

Type Description
ValueError

If the connector configuration is not valid.

Source code in zenml/service_connectors/service_connector.py
def to_model(
    self,
    user: UUID,
    workspace: UUID,
    name: Optional[str] = None,
    is_shared: bool = False,
    description: str = "",
    labels: Optional[Dict[str, str]] = None,
) -> "ServiceConnectorRequestModel":
    """Convert the connector instance to a service connector model.

    Args:
        name: The name of the connector.
        user: The ID of the user that created the connector.
        workspace: The ID of the workspace that the connector belongs to.
        is_shared: Whether the connector is shared with other users.
        description: The description of the connector.
        labels: The labels of the connector.

    Returns:
        The service connector model corresponding to the connector
        instance.

    Raises:
        ValueError: If the connector configuration is not valid.
    """
    spec = self.get_type()

    name = name or self.name
    if name is None:
        raise ValueError(
            "connector configuration is not valid: name must be set"
        )

    model = ServiceConnectorRequestModel(
        connector_type=spec.connector_type,
        name=name,
        description=description,
        user=user,
        workspace=workspace,
        is_shared=is_shared,
        auth_method=self.auth_method,
        expires_at=self.expires_at,
        expiration_seconds=self.expiration_seconds,
        labels=labels or {},
    )

    # Validate the connector configuration.
    model.validate_and_configure_resources(
        connector_type=spec,
        resource_types=self.resource_type,
        resource_id=self.resource_id,
        configuration=self.config.non_secret_values,
        secrets=self.config.secret_values,  # type: ignore[arg-type]
    )

    return model
to_response_model(self, workspace, user=None, name=None, id=None, is_shared=False, description='', labels=None)

Convert the connector instance to a service connector response model.

Parameters:

Name Type Description Default
workspace WorkspaceResponseModel

The workspace that the connector belongs to.

required
user Optional[zenml.models.user_models.UserResponseModel]

The user that created the connector.

None
name Optional[str]

The name of the connector.

None
id Optional[uuid.UUID]

The ID of the connector.

None
is_shared bool

Whether the connector is shared with other users.

False
description str

The description of the connector.

''
labels Optional[Dict[str, str]]

The labels of the connector.

None

Returns:

Type Description
ServiceConnectorResponseModel

The service connector response model corresponding to the connector instance.

Exceptions:

Type Description
ValueError

If the connector configuration is not valid.

Source code in zenml/service_connectors/service_connector.py
def to_response_model(
    self,
    workspace: WorkspaceResponseModel,
    user: Optional[UserResponseModel] = None,
    name: Optional[str] = None,
    id: Optional[UUID] = None,
    is_shared: bool = False,
    description: str = "",
    labels: Optional[Dict[str, str]] = None,
) -> "ServiceConnectorResponseModel":
    """Convert the connector instance to a service connector response model.

    Args:
        workspace: The workspace that the connector belongs to.
        user: The user that created the connector.
        name: The name of the connector.
        id: The ID of the connector.
        is_shared: Whether the connector is shared with other users.
        description: The description of the connector.
        labels: The labels of the connector.

    Returns:
        The service connector response model corresponding to the connector
        instance.

    Raises:
        ValueError: If the connector configuration is not valid.
    """
    spec = self.get_type()

    name = name or self.name
    id = id or self.id
    if name is None or id is None:
        raise ValueError(
            "connector configuration is not valid: name and ID must be set"
        )

    model = ServiceConnectorResponseModel(
        id=id,
        created=datetime.utcnow(),
        updated=datetime.utcnow(),
        connector_type=self.get_type(),
        name=name,
        description=description,
        user=user,
        workspace=workspace,
        is_shared=is_shared,
        auth_method=self.auth_method,
        expires_at=self.expires_at,
        expiration_seconds=self.expiration_seconds,
        labels=labels or {},
    )

    # Validate the connector configuration.
    model.validate_and_configure_resources(
        connector_type=spec,
        resource_types=self.resource_type,
        resource_id=self.resource_id,
        configuration=self.config.non_secret_values,
        secrets=self.config.secret_values,  # type: ignore[arg-type]
    )

    return model
validate_runtime_args(self, resource_type, resource_id=None, require_resource_type=False, require_resource_id=False, **kwargs)

Validate the runtime arguments against the connector configuration.

Validate that the supplied runtime arguments are compatible with the connector configuration and its specification. This includes validating that the resource type and resource ID are compatible with the connector configuration and its capabilities.

Parameters:

Name Type Description Default
resource_type Optional[str]

The type of the resource supplied at runtime by the connector's consumer. Must be the same as the resource type that the connector is configured to access, unless the connector is configured to access any resource type.

required
resource_id Optional[str]

The ID of the resource requested by the connector's consumer. Can be different than the resource ID that the connector is configured to access, e.g. if it is not in the canonical form.

None
require_resource_type bool

Whether the resource type is required.

False
require_resource_id bool

Whether the resource ID is required.

False
kwargs Any

Additional runtime arguments.

{}

Returns:

Type Description
Tuple[Union[str, NoneType], Union[str]]

The validated resource type and resource ID.

Exceptions:

Type Description
ValueError

If the runtime arguments are not valid.

Source code in zenml/service_connectors/service_connector.py
def validate_runtime_args(
    self,
    resource_type: Optional[str],
    resource_id: Optional[str] = None,
    require_resource_type: bool = False,
    require_resource_id: bool = False,
    **kwargs: Any,
) -> Tuple[Optional[str], Optional[str]]:
    """Validate the runtime arguments against the connector configuration.

    Validate that the supplied runtime arguments are compatible with the
    connector configuration and its specification. This includes validating
    that the resource type and resource ID are compatible with the connector
    configuration and its capabilities.

    Args:
        resource_type: The type of the resource supplied at runtime by the
            connector's consumer. Must be the same as the resource type that
            the connector is configured to access, unless the connector is
            configured to access any resource type.
        resource_id: The ID of the resource requested by the connector's
            consumer. Can be different than the resource ID that the
            connector is configured to access, e.g. if it is not in the
            canonical form.
        require_resource_type: Whether the resource type is required.
        require_resource_id: Whether the resource ID is required.
        kwargs: Additional runtime arguments.

    Returns:
        The validated resource type and resource ID.

    Raises:
        ValueError: If the runtime arguments are not valid.
    """
    if (
        self.resource_type
        and resource_type
        and (self.resource_type != resource_type)
    ):
        raise ValueError(
            f"the connector is configured to provide access to a "
            f"'{self.resource_type}' resource type, but a different "
            f"resource type was requested: '{resource_type}'."
        )

    resource_type = resource_type or self.resource_type
    resource_id = resource_id or self.resource_id

    if require_resource_type and not resource_type:
        raise ValueError(
            "the connector is configured to provide access to multiple "
            "resource types. A resource type must be specified when "
            "requesting access to a resource."
        )

    spec = self.get_type()

    try:
        # Get the resource specification corresponding to the
        # connector configuration.
        _, resource_spec = spec.find_resource_specifications(
            self.auth_method,
            resource_type,
        )
    except (KeyError, ValueError) as e:
        raise ValueError(
            f"connector configuration is not valid: {e}"
        ) from e

    if not resource_type or not resource_spec:
        if resource_id:
            raise ValueError(
                "the connector is configured to provide access to multiple "
                "resource types, but only a resource name was specified. A "
                "resource type must also be specified when "
                "requesting access to a resource."
            )

        return resource_type, resource_id

    # Validate and convert the resource ID to its canonical form.
    # A default resource ID is returned for resource types that do not
    # support instances, if no resource ID is specified.
    resource_id = self._validate_resource_id(
        resource_type=resource_type,
        resource_id=resource_id,
    )

    if resource_id:
        if self.resource_id and self.resource_id != resource_id:
            raise ValueError(
                f"the connector is configured to provide access to a "
                f"single {resource_spec.name} resource with a "
                f"resource name of '{self.resource_id}', but a "
                f"different resource name was requested: "
                f"'{resource_id}'."
            )

    else:
        if not self.resource_id and require_resource_id:
            raise ValueError(
                f"the connector is configured to provide access to "
                f"multiple {resource_spec.name} resources. A resource name "
                "must be specified when requesting access to a resource."
            )

    return resource_type, resource_id
verify(self, resource_type=None, resource_id=None, list_resources=True)

Verify and optionally list all the resources that the connector can access.

This method uses the connector's configuration to verify that it can authenticate and access the indicated resource(s).

If list_resources is set, the list of resources that the connector can access, scoped to the supplied resource type and resource ID is included in the response, otherwise the connector only verifies that it can globally authenticate and doesn't verify or return resource information (i.e. the resource_ids fields in the response are empty).

Parameters:

Name Type Description Default
resource_type Optional[str]

The type of the resource to verify. If the connector instance is already configured with a resource type, this argument must be the same as the one configured if supplied.

None
resource_id Optional[str]

The ID of a particular resource instance to check whether the connector can access. If the connector instance is already configured with a resource ID that is not the same or equivalent to the one requested, a ValueError exception is raised.

None
list_resources bool

Whether to list the resources that the connector can access.

True

Returns:

Type Description
ServiceConnectorResourcesModel

A list of resources that the connector can access.

Exceptions:

Type Description
ValueError

If the arguments or the connector configuration are not valid.

Source code in zenml/service_connectors/service_connector.py
def verify(
    self,
    resource_type: Optional[str] = None,
    resource_id: Optional[str] = None,
    list_resources: bool = True,
) -> ServiceConnectorResourcesModel:
    """Verify and optionally list all the resources that the connector can access.

    This method uses the connector's configuration to verify that it can
    authenticate and access the indicated resource(s).

    If `list_resources` is set, the list of resources that the connector can
    access, scoped to the supplied resource type and resource ID is included
    in the response, otherwise the connector only verifies that it can
    globally authenticate and doesn't verify or return resource information
    (i.e. the `resource_ids` fields in the response are empty).

    Args:
        resource_type: The type of the resource to verify. If the connector
            instance is already configured with a resource type, this
            argument must be the same as the one configured if supplied.
        resource_id: The ID of a particular resource instance to check
            whether the connector can access. If the connector instance is
            already configured with a resource ID that is not the same or
            equivalent to the one requested, a `ValueError` exception is
            raised.
        list_resources: Whether to list the resources that the connector can
            access.

    Returns:
        A list of resources that the connector can access.

    Raises:
        ValueError: If the arguments or the connector configuration are
            not valid.
    """
    spec = self.get_type()

    resources = ServiceConnectorResourcesModel(
        connector_type=spec,
        id=self.id,
        name=self.name,
    )

    name_msg = f" '{self.name}'" if self.name else ""

    resource_types = self.supported_resource_types
    if resource_type:
        if resource_type not in resource_types:
            raise ValueError(
                f"connector{name_msg} does not support resource type: "
                f"'{resource_type}'. Supported resource types are: "
                f"{', '.join(resource_types)}"
            )
        resource_types = [resource_type]

    # Pre-populate the list of resources with entries corresponding to the
    # supported resource types and scoped to the supplied resource type
    resources.resources = [
        ServiceConnectorTypedResourcesModel(
            resource_type=rt,
        )
        for rt in resource_types
    ]

    if self.has_expired():
        error = "the connector's authentication credentials have expired."
        # Log the error in the resources object
        resources.set_error(error)
        return resources

    verify_resource_types: List[Optional[str]] = []
    verify_resource_id = None
    if not list_resources and not resource_id:
        # If we're not listing resources, we're only verifying that the
        # connector can authenticate globally (i.e. without supplying a
        # resource type or resource ID). This is the same as if no resource
        # type or resource ID was supplied. The only exception is when the
        # verification scope is already set to a single resource ID, in
        # which case we still perform the verification on that resource ID.
        verify_resource_types = [None]
        verify_resource_id = None
    else:
        if len(resource_types) > 1:
            # This forces the connector to verify that it can authenticate
            # globally (i.e. without supplying a resource type or resource
            # ID) before listing individual resources in the case of
            # multi-type service connectors.
            verify_resource_types = [None]

        verify_resource_types.extend(resource_types)
        if len(verify_resource_types) == 1:
            verify_resource_id = resource_id

    # Here we go through each resource type and attempt to verify and list
    # the resources that the connector can access for that resource type.
    # This list may start with a `None` resource type, which indicates that
    # the connector should verify that it can authenticate globally.
    for resource_type in verify_resource_types:
        try:
            resource_type, resource_id = self.validate_runtime_args(
                resource_type=resource_type,
                resource_id=verify_resource_id,
                require_resource_type=False,
                require_resource_id=False,
            )

            resource_ids = self._verify(
                resource_type=resource_type,
                resource_id=resource_id,
            )
        except ValueError as exc:
            raise ValueError(
                f"The connector configuration is incomplete or invalid: "
                f"{exc}",
            )
        except AuthorizationException as exc:
            error = f"connector{name_msg} authorization failure: {exc}"
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)

            # Log the error in the resources object
            resources.set_error(error, resource_type=resource_type)
            if resource_type:
                continue
            else:
                # We stop on a global failure
                break
        except Exception as exc:
            error = (
                f"connector{name_msg} verification failed with "
                f"unexpected error: {exc}"
            )
            # Log an exception if debug logging is enabled
            if logger.isEnabledFor(logging.DEBUG):
                logger.exception(error)
            else:
                logger.warning(error)
            error = (
                "an unexpected error occurred while verifying the "
                "connector."
            )
            # Log the error in the resources object
            resources.set_error(error, resource_type=resource_type)
            if resource_type:
                continue
            else:
                # We stop on a global failure
                break

        if not resource_type:
            # If a resource type is not provided as argument, we don't
            # expect any resources to be listed
            continue

        resource_type_spec = spec.resource_type_dict[resource_type]

        if resource_id:
            # A single resource was requested, so we expect a single
            # resource to be listed
            if [resource_id] != resource_ids:
                logger.error(
                    f"a different resource ID '{resource_ids}' was "
                    f"returned than the one requested: {resource_ids}. "
                    f"This is likely a bug in the {self.__class__} "
                    "connector implementation."
                )
            resources.set_resource_ids(resource_type, [resource_id])
        elif not resource_ids:
            # If no resources were listed, signal this as an error that the
            # connector cannot access any resources.
            error = (
                f"connector{name_msg} didn't list any "
                f"{resource_type_spec.name} resources. This is likely "
                "caused by the connector credentials not being valid or "
                "not having sufficient permissions to list or access "
                "resources of this type. Please check the connector "
                "configuration and its credentials and try again."
            )
            logger.debug(error)
            resources.set_error(error, resource_type=resource_type)
        else:
            resources.set_resource_ids(resource_type, resource_ids)

    return resources

ServiceConnectorMeta (ModelMetaclass)

Metaclass responsible for automatically registering ServiceConnector classes.

Source code in zenml/service_connectors/service_connector.py
class ServiceConnectorMeta(ModelMetaclass):
    """Metaclass responsible for automatically registering ServiceConnector classes."""

    def __new__(
        mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
    ) -> "ServiceConnectorMeta":
        """Creates a new ServiceConnector class and registers it.

        Args:
            name: The name of the class.
            bases: The base classes of the class.
            dct: The dictionary of the class.

        Returns:
            The ServiceConnectorMeta class.
        """
        cls = cast(
            Type["ServiceConnector"], super().__new__(mcs, name, bases, dct)
        )

        # Skip the following validation and registration for the base class.
        if name == "ServiceConnector":
            return cls

        else:
            from zenml.service_connectors.service_connector_registry import (
                service_connector_registry,
            )

            # Register the service connector.
            service_connector_registry.register_service_connector_type(
                cls.get_type()
            )

        return cls
__new__(mcs, name, bases, dct) special staticmethod

Creates a new ServiceConnector class and registers it.

Parameters:

Name Type Description Default
name str

The name of the class.

required
bases Tuple[Type[Any], ...]

The base classes of the class.

required
dct Dict[str, Any]

The dictionary of the class.

required

Returns:

Type Description
ServiceConnectorMeta

The ServiceConnectorMeta class.

Source code in zenml/service_connectors/service_connector.py
def __new__(
    mcs, name: str, bases: Tuple[Type[Any], ...], dct: Dict[str, Any]
) -> "ServiceConnectorMeta":
    """Creates a new ServiceConnector class and registers it.

    Args:
        name: The name of the class.
        bases: The base classes of the class.
        dct: The dictionary of the class.

    Returns:
        The ServiceConnectorMeta class.
    """
    cls = cast(
        Type["ServiceConnector"], super().__new__(mcs, name, bases, dct)
    )

    # Skip the following validation and registration for the base class.
    if name == "ServiceConnector":
        return cls

    else:
        from zenml.service_connectors.service_connector_registry import (
            service_connector_registry,
        )

        # Register the service connector.
        service_connector_registry.register_service_connector_type(
            cls.get_type()
        )

    return cls

service_connector_registry

Implementation of a service connector registry.

ServiceConnectorRegistry

Service connector registry.

Source code in zenml/service_connectors/service_connector_registry.py
class ServiceConnectorRegistry:
    """Service connector registry."""

    def __init__(self) -> None:
        """Initialize the service connector registry."""
        self.service_connector_types: Dict[str, ServiceConnectorTypeModel] = {}
        self.initialized = False

    def register_service_connector_type(
        self,
        service_connector_type: ServiceConnectorTypeModel,
        overwrite: bool = False,
    ) -> None:
        """Registers a service connector type.

        Args:
            service_connector_type: Service connector type.
            overwrite: Whether to overwrite an existing service connector type.
        """
        if (
            service_connector_type.connector_type
            not in self.service_connector_types
            or overwrite
        ):
            self.service_connector_types[
                service_connector_type.connector_type
            ] = service_connector_type
            logger.debug(
                "Registered service connector type "
                f"{service_connector_type.connector_type}."
            )
        else:
            logger.debug(
                f"Found existing service connector for type "
                f"{service_connector_type.connector_type}: Skipping registration."
            )

    def get_service_connector_type(
        self,
        connector_type: str,
    ) -> ServiceConnectorTypeModel:
        """Get a service connector type by its connector type identifier.

        Args:
            connector_type: The service connector type identifier.

        Returns:
            A service connector type that was registered for the given
            connector type identifier.

        Raises:
            KeyError: If no service connector was registered for the given type
                identifier.
        """
        self.register_builtin_service_connectors()

        if connector_type not in self.service_connector_types:
            raise KeyError(
                f"Service connector type {connector_type} is not available. "
                f"Please make sure the corresponding packages and/or ZenML "
                f"integration are installed and try again."
            )
        return self.service_connector_types[connector_type].copy()

    def __getitem__(self, key: str) -> ServiceConnectorTypeModel:
        """Get a service connector type by its connector type identifier.

        Args:
            key: The service connector type identifier.

        Returns:
            A service connector that was registered for the given service
            connector type identifier.
        """
        return self.get_service_connector_type(key)

    def is_registered(self, connector_type: str) -> bool:
        """Returns if a service connector is registered for the given type identifier.

        Args:
            connector_type: The service connector type identifier.

        Returns:
            True if a service connector is registered for the given type
            identifier, False otherwise.
        """
        self.register_builtin_service_connectors()
        return connector_type in self.service_connector_types

    def list_service_connector_types(
        self,
        connector_type: Optional[str] = None,
        resource_type: Optional[str] = None,
        auth_method: Optional[str] = None,
    ) -> List[ServiceConnectorTypeModel]:
        """Find one or more service connector types that match the given criteria.

        Args:
            connector_type: Filter by service connector type identifier.
            resource_type: Filter by a resource type that the connector can
                be used to give access to.
            auth_method: Filter by an authentication method that the connector
                uses to authenticate with the resource provider.

        Returns:
            A list of service connector type models that match the given
            criteria.
        """
        self.register_builtin_service_connectors()

        matches: List[ServiceConnectorTypeModel] = []
        for service_connector_type in self.service_connector_types.values():
            if (
                (
                    connector_type is None
                    or connector_type == service_connector_type.connector_type
                )
                and (
                    resource_type is None
                    or resource_type
                    in service_connector_type.resource_type_dict
                )
                and (
                    auth_method is None
                    or auth_method in service_connector_type.auth_method_dict
                )
            ):
                matches.append(service_connector_type.copy())

        return matches

    def instantiate_connector(
        self,
        model: ServiceConnectorBaseModel,
    ) -> "ServiceConnector":
        """Validate a service connector model and create an instance from it.

        Args:
            model: The service connector model to validate and instantiate.

        Returns:
            A service connector instance.

        Raises:
            NotImplementedError: If no service connector is registered for the
                given type identifier.
            ValueError: If the service connector model is not valid.
        """
        try:
            service_connector_type = self.get_service_connector_type(
                model.type
            )
        except KeyError:
            raise NotImplementedError(
                f"Service connector type {model.type} is not available "
                "locally. Please make sure the corresponding packages and/or "
                "ZenML integration are installed and try again."
            )

        assert service_connector_type.connector_class is not None

        try:
            return service_connector_type.connector_class.from_model(model)
        except ValueError as e:
            raise ValueError(
                f"The service connector configuration is not valid: {e}"
            )

    def register_builtin_service_connectors(self) -> None:
        """Registers the default built-in service connectors."""
        # Only register built-in service connectors once
        if self.initialized:
            return

        self.initialized = True

        try:
            from zenml.integrations.aws.service_connectors.aws_service_connector import (  # noqa
                AWSServiceConnector,
            )
        except ImportError as e:
            logger.warning(f"Could not import AWS service connector: {e}.")

        try:
            from zenml.integrations.gcp.service_connectors.gcp_service_connector import (  # noqa
                GCPServiceConnector,
            )
        except ImportError as e:
            logger.warning(f"Could not import GCP service connector: {e}.")

        try:
            from zenml.integrations.azure.service_connectors.azure_service_connector import (  # noqa
                AzureServiceConnector,
            )
        except ImportError as e:
            logger.warning(f"Could not import Azure service connector: {e}.")

        try:
            from zenml.integrations.kubernetes.service_connectors.kubernetes_service_connector import (  # noqa
                KubernetesServiceConnector,
            )
        except ImportError as e:
            logger.warning(
                f"Could not import Kubernetes service connector: {e}."
            )

        try:
            from zenml.service_connectors.docker_service_connector import (  # noqa
                DockerServiceConnector,
            )
        except ImportError as e:
            logger.warning(f"Could not import Docker service connector: {e}.")
__getitem__(self, key) special

Get a service connector type by its connector type identifier.

Parameters:

Name Type Description Default
key str

The service connector type identifier.

required

Returns:

Type Description
ServiceConnectorTypeModel

A service connector that was registered for the given service connector type identifier.

Source code in zenml/service_connectors/service_connector_registry.py
def __getitem__(self, key: str) -> ServiceConnectorTypeModel:
    """Get a service connector type by its connector type identifier.

    Args:
        key: The service connector type identifier.

    Returns:
        A service connector that was registered for the given service
        connector type identifier.
    """
    return self.get_service_connector_type(key)
__init__(self) special

Initialize the service connector registry.

Source code in zenml/service_connectors/service_connector_registry.py
def __init__(self) -> None:
    """Initialize the service connector registry."""
    self.service_connector_types: Dict[str, ServiceConnectorTypeModel] = {}
    self.initialized = False
get_service_connector_type(self, connector_type)

Get a service connector type by its connector type identifier.

Parameters:

Name Type Description Default
connector_type str

The service connector type identifier.

required

Returns:

Type Description
ServiceConnectorTypeModel

A service connector type that was registered for the given connector type identifier.

Exceptions:

Type Description
KeyError

If no service connector was registered for the given type identifier.

Source code in zenml/service_connectors/service_connector_registry.py
def get_service_connector_type(
    self,
    connector_type: str,
) -> ServiceConnectorTypeModel:
    """Get a service connector type by its connector type identifier.

    Args:
        connector_type: The service connector type identifier.

    Returns:
        A service connector type that was registered for the given
        connector type identifier.

    Raises:
        KeyError: If no service connector was registered for the given type
            identifier.
    """
    self.register_builtin_service_connectors()

    if connector_type not in self.service_connector_types:
        raise KeyError(
            f"Service connector type {connector_type} is not available. "
            f"Please make sure the corresponding packages and/or ZenML "
            f"integration are installed and try again."
        )
    return self.service_connector_types[connector_type].copy()
instantiate_connector(self, model)

Validate a service connector model and create an instance from it.

Parameters:

Name Type Description Default
model ServiceConnectorBaseModel

The service connector model to validate and instantiate.

required

Returns:

Type Description
ServiceConnector

A service connector instance.

Exceptions:

Type Description
NotImplementedError

If no service connector is registered for the given type identifier.

ValueError

If the service connector model is not valid.

Source code in zenml/service_connectors/service_connector_registry.py
def instantiate_connector(
    self,
    model: ServiceConnectorBaseModel,
) -> "ServiceConnector":
    """Validate a service connector model and create an instance from it.

    Args:
        model: The service connector model to validate and instantiate.

    Returns:
        A service connector instance.

    Raises:
        NotImplementedError: If no service connector is registered for the
            given type identifier.
        ValueError: If the service connector model is not valid.
    """
    try:
        service_connector_type = self.get_service_connector_type(
            model.type
        )
    except KeyError:
        raise NotImplementedError(
            f"Service connector type {model.type} is not available "
            "locally. Please make sure the corresponding packages and/or "
            "ZenML integration are installed and try again."
        )

    assert service_connector_type.connector_class is not None

    try:
        return service_connector_type.connector_class.from_model(model)
    except ValueError as e:
        raise ValueError(
            f"The service connector configuration is not valid: {e}"
        )
is_registered(self, connector_type)

Returns if a service connector is registered for the given type identifier.

Parameters:

Name Type Description Default
connector_type str

The service connector type identifier.

required

Returns:

Type Description
bool

True if a service connector is registered for the given type identifier, False otherwise.

Source code in zenml/service_connectors/service_connector_registry.py
def is_registered(self, connector_type: str) -> bool:
    """Returns if a service connector is registered for the given type identifier.

    Args:
        connector_type: The service connector type identifier.

    Returns:
        True if a service connector is registered for the given type
        identifier, False otherwise.
    """
    self.register_builtin_service_connectors()
    return connector_type in self.service_connector_types
list_service_connector_types(self, connector_type=None, resource_type=None, auth_method=None)

Find one or more service connector types that match the given criteria.

Parameters:

Name Type Description Default
connector_type Optional[str]

Filter by service connector type identifier.

None
resource_type Optional[str]

Filter by a resource type that the connector can be used to give access to.

None
auth_method Optional[str]

Filter by an authentication method that the connector uses to authenticate with the resource provider.

None

Returns:

Type Description
List[zenml.models.service_connector_models.ServiceConnectorTypeModel]

A list of service connector type models that match the given criteria.

Source code in zenml/service_connectors/service_connector_registry.py
def list_service_connector_types(
    self,
    connector_type: Optional[str] = None,
    resource_type: Optional[str] = None,
    auth_method: Optional[str] = None,
) -> List[ServiceConnectorTypeModel]:
    """Find one or more service connector types that match the given criteria.

    Args:
        connector_type: Filter by service connector type identifier.
        resource_type: Filter by a resource type that the connector can
            be used to give access to.
        auth_method: Filter by an authentication method that the connector
            uses to authenticate with the resource provider.

    Returns:
        A list of service connector type models that match the given
        criteria.
    """
    self.register_builtin_service_connectors()

    matches: List[ServiceConnectorTypeModel] = []
    for service_connector_type in self.service_connector_types.values():
        if (
            (
                connector_type is None
                or connector_type == service_connector_type.connector_type
            )
            and (
                resource_type is None
                or resource_type
                in service_connector_type.resource_type_dict
            )
            and (
                auth_method is None
                or auth_method in service_connector_type.auth_method_dict
            )
        ):
            matches.append(service_connector_type.copy())

    return matches
register_builtin_service_connectors(self)

Registers the default built-in service connectors.

Source code in zenml/service_connectors/service_connector_registry.py
def register_builtin_service_connectors(self) -> None:
    """Registers the default built-in service connectors."""
    # Only register built-in service connectors once
    if self.initialized:
        return

    self.initialized = True

    try:
        from zenml.integrations.aws.service_connectors.aws_service_connector import (  # noqa
            AWSServiceConnector,
        )
    except ImportError as e:
        logger.warning(f"Could not import AWS service connector: {e}.")

    try:
        from zenml.integrations.gcp.service_connectors.gcp_service_connector import (  # noqa
            GCPServiceConnector,
        )
    except ImportError as e:
        logger.warning(f"Could not import GCP service connector: {e}.")

    try:
        from zenml.integrations.azure.service_connectors.azure_service_connector import (  # noqa
            AzureServiceConnector,
        )
    except ImportError as e:
        logger.warning(f"Could not import Azure service connector: {e}.")

    try:
        from zenml.integrations.kubernetes.service_connectors.kubernetes_service_connector import (  # noqa
            KubernetesServiceConnector,
        )
    except ImportError as e:
        logger.warning(
            f"Could not import Kubernetes service connector: {e}."
        )

    try:
        from zenml.service_connectors.docker_service_connector import (  # noqa
            DockerServiceConnector,
        )
    except ImportError as e:
        logger.warning(f"Could not import Docker service connector: {e}.")
register_service_connector_type(self, service_connector_type, overwrite=False)

Registers a service connector type.

Parameters:

Name Type Description Default
service_connector_type ServiceConnectorTypeModel

Service connector type.

required
overwrite bool

Whether to overwrite an existing service connector type.

False
Source code in zenml/service_connectors/service_connector_registry.py
def register_service_connector_type(
    self,
    service_connector_type: ServiceConnectorTypeModel,
    overwrite: bool = False,
) -> None:
    """Registers a service connector type.

    Args:
        service_connector_type: Service connector type.
        overwrite: Whether to overwrite an existing service connector type.
    """
    if (
        service_connector_type.connector_type
        not in self.service_connector_types
        or overwrite
    ):
        self.service_connector_types[
            service_connector_type.connector_type
        ] = service_connector_type
        logger.debug(
            "Registered service connector type "
            f"{service_connector_type.connector_type}."
        )
    else:
        logger.debug(
            f"Found existing service connector for type "
            f"{service_connector_type.connector_type}: Skipping registration."
        )