S3
zenml.integrations.s3
special
Initialization of the S3 integration.
The S3 integration allows the use of cloud artifact stores and file operations on S3 buckets.
S3Integration (Integration)
Definition of S3 integration for ZenML.
Source code in zenml/integrations/s3/__init__.py
class S3Integration(Integration):
"""Definition of S3 integration for ZenML."""
NAME = S3
REQUIREMENTS = ["s3fs==2022.3.0"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the s3 integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.s3.flavors import S3ArtifactStoreFlavor
return [S3ArtifactStoreFlavor]
flavors()
classmethod
Declare the stack component flavors for the s3 integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/s3/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the s3 integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.s3.flavors import S3ArtifactStoreFlavor
return [S3ArtifactStoreFlavor]
artifact_stores
special
Initialization of the S3 Artifact Store.
s3_artifact_store
Implementation of the S3 Artifact Store.
S3ArtifactStore (BaseArtifactStore, AuthenticationMixin)
Artifact Store for S3 based artifacts.
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
class S3ArtifactStore(BaseArtifactStore, AuthenticationMixin):
"""Artifact Store for S3 based artifacts."""
_filesystem: Optional[s3fs.S3FileSystem] = None
@property
def config(self) -> S3ArtifactStoreConfig:
"""Get the config of this artifact store.
Returns:
The config of this artifact store.
"""
return cast(S3ArtifactStoreConfig, self._config)
def _get_credentials(
self,
) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""Gets authentication credentials.
If an authentication secret is configured, the secret values are
returned. Otherwise, we fall back to the plain text component
attributes.
Returns:
Tuple (key, secret, token) of credentials used to authenticate with
the S3 filesystem.
"""
secret = self.get_authentication_secret(
expected_schema_type=AWSSecretSchema
)
if secret:
return (
secret.aws_access_key_id,
secret.aws_secret_access_key,
secret.aws_session_token,
)
else:
return self.config.key, self.config.secret, self.config.token
@property
def filesystem(self) -> s3fs.S3FileSystem:
"""The s3 filesystem to access this artifact store.
Returns:
The s3 filesystem.
"""
if not self._filesystem:
key, secret, token = self._get_credentials()
self._filesystem = s3fs.S3FileSystem(
key=key,
secret=secret,
token=token,
client_kwargs=self.config.client_kwargs,
config_kwargs=self.config.config_kwargs,
s3_additional_kwargs=self.config.s3_additional_kwargs,
)
return self._filesystem
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
return [f"s3://{path}" for path in self.filesystem.glob(path=pattern)]
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of paths that are files in the given directory.
"""
# remove s3 prefix if given, so we can remove the directory later as
# this method is expected to only return filenames
path = convert_to_str(path)
if path.startswith("s3://"):
path = path[5:]
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a file info dict returned by the S3 filesystem.
Args:
file_dict: A file info dict returned by the S3 filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["Key"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
# s3fs.listdir also returns the root directory, so we filter
# it out here
if _extract_basename(dict_)
]
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path)
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path of the file to remove.
"""
self.filesystem.rm_file(path=path)
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
A dictionary containing the stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
for directory, subdirectories, files in self.filesystem.walk(path=top):
yield f"s3://{directory}", subdirectories, files
config: S3ArtifactStoreConfig
property
readonly
Get the config of this artifact store.
Returns:
Type | Description |
---|---|
S3ArtifactStoreConfig |
The config of this artifact store. |
filesystem: S3FileSystem
property
readonly
The s3 filesystem to access this artifact store.
Returns:
Type | Description |
---|---|
S3FileSystem |
The s3 filesystem. |
copyfile(self, src, dst, overwrite=False)
Copy a file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path to copy from. |
required |
dst |
Union[bytes, str] |
The path to copy to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def copyfile(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Copy a file.
Args:
src: The path to copy from.
dst: The path to copy to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to copy to destination '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to copy anyway."
)
# TODO [ENG-151]: Check if it works with overwrite=True or if we need to
# manually remove it first
self.filesystem.copy(path1=src, path2=dst)
exists(self, path)
Check whether a path exists.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path exists, False otherwise. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def exists(self, path: PathType) -> bool:
"""Check whether a path exists.
Args:
path: The path to check.
Returns:
True if the path exists, False otherwise.
"""
return self.filesystem.exists(path=path) # type: ignore[no-any-return]
glob(self, pattern)
Return all paths that match the given glob pattern.
The glob pattern may include: - '' to match any number of characters - '?' to match a single character - '[...]' to match one of the characters inside the brackets - '' as the full name of a path component to match to search in subdirectories of any depth (e.g. '/some_dir/*/some_file)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
pattern |
Union[bytes, str] |
The glob pattern to match, see details above. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that match the given glob pattern. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def glob(self, pattern: PathType) -> List[PathType]:
"""Return all paths that match the given glob pattern.
The glob pattern may include:
- '*' to match any number of characters
- '?' to match a single character
- '[...]' to match one of the characters inside the brackets
- '**' as the full name of a path component to match to search
in subdirectories of any depth (e.g. '/some_dir/**/some_file)
Args:
pattern: The glob pattern to match, see details above.
Returns:
A list of paths that match the given glob pattern.
"""
return [f"s3://{path}" for path in self.filesystem.glob(path=pattern)]
isdir(self, path)
Check whether a path is a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to check. |
required |
Returns:
Type | Description |
---|---|
bool |
True if the path is a directory, False otherwise. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def isdir(self, path: PathType) -> bool:
"""Check whether a path is a directory.
Args:
path: The path to check.
Returns:
True if the path is a directory, False otherwise.
"""
return self.filesystem.isdir(path=path) # type: ignore[no-any-return]
listdir(self, path)
Return a list of files in a directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to list. |
required |
Returns:
Type | Description |
---|---|
List[Union[bytes, str]] |
A list of paths that are files in the given directory. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def listdir(self, path: PathType) -> List[PathType]:
"""Return a list of files in a directory.
Args:
path: The path to list.
Returns:
A list of paths that are files in the given directory.
"""
# remove s3 prefix if given, so we can remove the directory later as
# this method is expected to only return filenames
path = convert_to_str(path)
if path.startswith("s3://"):
path = path[5:]
def _extract_basename(file_dict: Dict[str, Any]) -> str:
"""Extracts the basename from a file info dict returned by the S3 filesystem.
Args:
file_dict: A file info dict returned by the S3 filesystem.
Returns:
The basename of the file.
"""
file_path = cast(str, file_dict["Key"])
base_name = file_path[len(path) :]
return base_name.lstrip("/")
return [
_extract_basename(dict_)
for dict_ in self.filesystem.listdir(path=path)
# s3fs.listdir also returns the root directory, so we filter
# it out here
if _extract_basename(dict_)
]
makedirs(self, path)
Create a directory at the given path.
If needed also create missing parent directories.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def makedirs(self, path: PathType) -> None:
"""Create a directory at the given path.
If needed also create missing parent directories.
Args:
path: The path to create.
"""
self.filesystem.makedirs(path=path, exist_ok=True)
mkdir(self, path)
Create a directory at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to create. |
required |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def mkdir(self, path: PathType) -> None:
"""Create a directory at the given path.
Args:
path: The path to create.
"""
self.filesystem.makedir(path=path)
open(self, path, mode='r')
Open a file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
Path of the file to open. |
required |
mode |
str |
Mode in which to open the file. Currently, only 'rb' and 'wb' to read and write binary files are supported. |
'r' |
Returns:
Type | Description |
---|---|
Any |
A file-like object. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def open(self, path: PathType, mode: str = "r") -> Any:
"""Open a file at the given path.
Args:
path: Path of the file to open.
mode: Mode in which to open the file. Currently, only
'rb' and 'wb' to read and write binary files are supported.
Returns:
A file-like object.
"""
return self.filesystem.open(path=path, mode=mode)
remove(self, path)
Remove the file at the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the file to remove. |
required |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def remove(self, path: PathType) -> None:
"""Remove the file at the given path.
Args:
path: The path of the file to remove.
"""
self.filesystem.rm_file(path=path)
rename(self, src, dst, overwrite=False)
Rename source file to destination file.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
src |
Union[bytes, str] |
The path of the file to rename. |
required |
dst |
Union[bytes, str] |
The path to rename the source file to. |
required |
overwrite |
bool |
If a file already exists at the destination, this
method will overwrite it if overwrite= |
False |
Exceptions:
Type | Description |
---|---|
FileExistsError |
If a file already exists at the destination
and overwrite is not set to |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def rename(
self, src: PathType, dst: PathType, overwrite: bool = False
) -> None:
"""Rename source file to destination file.
Args:
src: The path of the file to rename.
dst: The path to rename the source file to.
overwrite: If a file already exists at the destination, this
method will overwrite it if overwrite=`True` and
raise a FileExistsError otherwise.
Raises:
FileExistsError: If a file already exists at the destination
and overwrite is not set to `True`.
"""
if not overwrite and self.filesystem.exists(dst):
raise FileExistsError(
f"Unable to rename file to '{convert_to_str(dst)}', "
f"file already exists. Set `overwrite=True` to rename anyway."
)
# TODO [ENG-152]: Check if it works with overwrite=True or if we need
# to manually remove it first
self.filesystem.rename(path1=src, path2=dst)
rmtree(self, path)
Remove the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path of the directory to remove. |
required |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def rmtree(self, path: PathType) -> None:
"""Remove the given directory.
Args:
path: The path of the directory to remove.
"""
self.filesystem.delete(path=path, recursive=True)
stat(self, path)
Return stat info for the given path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path |
Union[bytes, str] |
The path to get stat info for. |
required |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
A dictionary containing the stat info. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def stat(self, path: PathType) -> Dict[str, Any]:
"""Return stat info for the given path.
Args:
path: The path to get stat info for.
Returns:
A dictionary containing the stat info.
"""
return self.filesystem.stat(path=path) # type: ignore[no-any-return]
walk(self, top, topdown=True, onerror=None)
Return an iterator that walks the contents of the given directory.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
top |
Union[bytes, str] |
Path of directory to walk. |
required |
topdown |
bool |
Unused argument to conform to interface. |
True |
onerror |
Optional[Callable[..., NoneType]] |
Unused argument to conform to interface. |
None |
Yields:
Type | Description |
---|---|
Iterable[Tuple[Union[bytes, str], List[Union[bytes, str]], List[Union[bytes, str]]]] |
An Iterable of Tuples, each of which contain the path of the current directory path, a list of directories inside the current directory and a list of files inside the current directory. |
Source code in zenml/integrations/s3/artifact_stores/s3_artifact_store.py
def walk(
self,
top: PathType,
topdown: bool = True,
onerror: Optional[Callable[..., None]] = None,
) -> Iterable[Tuple[PathType, List[PathType], List[PathType]]]:
"""Return an iterator that walks the contents of the given directory.
Args:
top: Path of directory to walk.
topdown: Unused argument to conform to interface.
onerror: Unused argument to conform to interface.
Yields:
An Iterable of Tuples, each of which contain the path of the current
directory path, a list of directories inside the current directory
and a list of files inside the current directory.
"""
# TODO [ENG-153]: Additional params
for directory, subdirectories, files in self.filesystem.walk(path=top):
yield f"s3://{directory}", subdirectories, files
flavors
special
Amazon S3 integration flavors.
s3_artifact_store_flavor
Amazon S3 artifact store flavor.
S3ArtifactStoreConfig (BaseArtifactStoreConfig, AuthenticationConfigMixin)
pydantic-model
Configuration for the S3 Artifact Store.
All attributes of this class except path
will be passed to the
s3fs.S3FileSystem
initialization. See
here for more information on how
to use those configuration options to connect to any S3-compatible storage.
When you want to register an S3ArtifactStore from the CLI and need to pass
client_kwargs
, config_kwargs
or s3_additional_kwargs
, you should pass
them as a json string:
zenml artifact-store register my_s3_store --flavor=s3 --path=s3://my_bucket --client_kwargs='{"endpoint_url": "http://my-s3-endpoint"}'
Source code in zenml/integrations/s3/flavors/s3_artifact_store_flavor.py
class S3ArtifactStoreConfig(BaseArtifactStoreConfig, AuthenticationConfigMixin):
"""Configuration for the S3 Artifact Store.
All attributes of this class except `path` will be passed to the
`s3fs.S3FileSystem` initialization. See
[here](https://s3fs.readthedocs.io/en/latest/) for more information on how
to use those configuration options to connect to any S3-compatible storage.
When you want to register an S3ArtifactStore from the CLI and need to pass
`client_kwargs`, `config_kwargs` or `s3_additional_kwargs`, you should pass
them as a json string:
```
zenml artifact-store register my_s3_store --flavor=s3 \
--path=s3://my_bucket --client_kwargs='{"endpoint_url": "http://my-s3-endpoint"}'
```
"""
SUPPORTED_SCHEMES: ClassVar[Set[str]] = {"s3://"}
key: Optional[str] = SecretField()
secret: Optional[str] = SecretField()
token: Optional[str] = SecretField()
client_kwargs: Optional[Dict[str, Any]] = None
config_kwargs: Optional[Dict[str, Any]] = None
s3_additional_kwargs: Optional[Dict[str, Any]] = None
@validator(
"client_kwargs", "config_kwargs", "s3_additional_kwargs", pre=True
)
def _convert_json_string(
cls, value: Union[None, str, Dict[str, Any]]
) -> Optional[Dict[str, Any]]:
"""Converts potential JSON strings passed via the CLI to dictionaries.
Args:
value: The value to convert.
Returns:
The converted value.
Raises:
TypeError: If the value is not a `str`, `Dict` or `None`.
ValueError: If the value is an invalid json string or a json string
that does not decode into a dictionary.
"""
if isinstance(value, str):
try:
dict_ = json.loads(value)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid json string '{value}'") from e
if not isinstance(dict_, Dict):
raise ValueError(
f"Json string '{value}' did not decode into a dictionary."
)
return dict_
elif isinstance(value, Dict) or value is None:
return value
else:
raise TypeError(f"{value} is not a json string or a dictionary.")
S3ArtifactStoreFlavor (BaseArtifactStoreFlavor)
Flavor of the S3 artifact store.
Source code in zenml/integrations/s3/flavors/s3_artifact_store_flavor.py
class S3ArtifactStoreFlavor(BaseArtifactStoreFlavor):
"""Flavor of the S3 artifact store."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return S3_ARTIFACT_STORE_FLAVOR
@property
def config_class(self) -> Type[S3ArtifactStoreConfig]:
"""The config class of the flavor.
Returns:
The config class of the flavor.
"""
return S3ArtifactStoreConfig
@property
def implementation_class(self) -> Type["S3ArtifactStore"]:
"""Implementation class for this flavor.
Returns:
The implementation class for this flavor.
"""
from zenml.integrations.s3.artifact_stores import S3ArtifactStore
return S3ArtifactStore
config_class: Type[zenml.integrations.s3.flavors.s3_artifact_store_flavor.S3ArtifactStoreConfig]
property
readonly
The config class of the flavor.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.s3.flavors.s3_artifact_store_flavor.S3ArtifactStoreConfig] |
The config class of the flavor. |
implementation_class: Type[S3ArtifactStore]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[S3ArtifactStore] |
The implementation class for this flavor. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |