Feast
zenml.integrations.feast
special
Initialization for Feast integration.
The Feast integration offers a way to connect to a Feast Feature Store. ZenML implements a dedicated stack component that you can access as part of your ZenML steps in the usual ways.
FeastIntegration (Integration)
Definition of Feast integration for ZenML.
Source code in zenml/integrations/feast/__init__.py
class FeastIntegration(Integration):
"""Definition of Feast integration for ZenML."""
NAME = FEAST
REQUIREMENTS = ["feast[redis]~=0.26.0", "redis-server>=6.0.9"]
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Feast integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.feast.flavors import FeastFeatureStoreFlavor
return [FeastFeatureStoreFlavor]
flavors()
classmethod
Declare the stack component flavors for the Feast integration.
Returns:
Type | Description |
---|---|
List[Type[zenml.stack.flavor.Flavor]] |
List of stack component flavors for this integration. |
Source code in zenml/integrations/feast/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
"""Declare the stack component flavors for the Feast integration.
Returns:
List of stack component flavors for this integration.
"""
from zenml.integrations.feast.flavors import FeastFeatureStoreFlavor
return [FeastFeatureStoreFlavor]
feature_stores
special
Feast Feature Store integration for ZenML.
Feature stores allow data teams to serve data via an offline store and an online low-latency store where data is kept in sync between the two. It also offers a centralized registry where features (and feature schemas) are stored for use within a team or wider organization. Feature stores are a relatively recent addition to commonly-used machine learning stacks. Feast is a leading open-source feature store, first developed by Gojek in collaboration with Google.
feast_feature_store
Implementation of the Feast Feature Store for ZenML.
FeastFeatureStore (BaseFeatureStore)
Class to interact with the Feast feature store.
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
class FeastFeatureStore(BaseFeatureStore):
"""Class to interact with the Feast feature store."""
@property
def config(self) -> FeastFeatureStoreConfig:
"""Returns the `FeastFeatureStoreConfig` config.
Returns:
The configuration.
"""
return cast(FeastFeatureStoreConfig, self._config)
def _validate_connection(self) -> None:
"""Validates the connection to the feature store.
Raises:
ConnectionError: If the online component (Redis) is not available.
"""
client = redis.Redis(
host=self.config.online_host, port=self.config.online_port
)
try:
client.ping()
except redis.exceptions.ConnectionError as e:
raise redis.exceptions.ConnectionError(
"Could not connect to feature store's online component. "
"Please make sure that Redis is running."
) from e
def get_historical_features(
self,
entity_df: Union[pd.DataFrame, str],
features: List[str],
full_feature_names: bool = False,
) -> pd.DataFrame:
"""Returns the historical features for training or batch scoring.
Args:
entity_df: The entity DataFrame or entity name.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The historical features as a Pandas DataFrame.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return fs.get_historical_features(
entity_df=entity_df,
features=features,
full_feature_names=full_feature_names,
).to_df()
def get_online_features(
self,
entity_rows: List[Dict[str, Any]],
features: List[str],
full_feature_names: bool = False,
) -> Dict[str, Any]:
"""Returns the latest online feature data.
Args:
entity_rows: The entity rows to retrieve.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The latest online feature data as a dictionary.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return fs.get_online_features( # type: ignore[no-any-return]
entity_rows=entity_rows,
features=features,
full_feature_names=full_feature_names,
).to_dict()
def get_data_sources(self) -> List[str]:
"""Returns the data sources' names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The data sources' names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_data_sources()]
def get_entities(self) -> List[str]:
"""Returns the entity names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The entity names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_entities()]
def get_feature_services(self) -> List[str]:
"""Returns the feature service names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature service names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_feature_services()]
def get_feature_views(self) -> List[str]:
"""Returns the feature view names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature view names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_feature_views()]
def get_project(self) -> str:
"""Returns the project name.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The project name.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return str(fs.project)
def get_registry(self) -> Registry:
"""Returns the feature store registry.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The registry.
"""
fs: FeatureStore = FeatureStore(repo_path=self.config.feast_repo)
return fs.registry
def get_feast_version(self) -> str:
"""Returns the version of Feast used.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The version of Feast currently being used.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return str(fs.version())
config: FeastFeatureStoreConfig
property
readonly
Returns the FeastFeatureStoreConfig
config.
Returns:
Type | Description |
---|---|
FeastFeatureStoreConfig |
The configuration. |
get_data_sources(self)
Returns the data sources' names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The data sources' names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_data_sources(self) -> List[str]:
"""Returns the data sources' names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The data sources' names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_data_sources()]
get_entities(self)
Returns the entity names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The entity names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_entities(self) -> List[str]:
"""Returns the entity names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The entity names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_entities()]
get_feast_version(self)
Returns the version of Feast used.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
str |
The version of Feast currently being used. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feast_version(self) -> str:
"""Returns the version of Feast used.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The version of Feast currently being used.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return str(fs.version())
get_feature_services(self)
Returns the feature service names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The feature service names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_services(self) -> List[str]:
"""Returns the feature service names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature service names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_feature_services()]
get_feature_views(self)
Returns the feature view names.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
List[str] |
The feature view names. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_views(self) -> List[str]:
"""Returns the feature view names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The feature view names.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return [ds.name for ds in fs.list_feature_views()]
get_historical_features(self, entity_df, features, full_feature_names=False)
Returns the historical features for training or batch scoring.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_df |
Union[pandas.core.frame.DataFrame, str] |
The entity DataFrame or entity name. |
required |
features |
List[str] |
The features to retrieve. |
required |
full_feature_names |
bool |
Whether to return the full feature names. |
False |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
DataFrame |
The historical features as a Pandas DataFrame. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_historical_features(
self,
entity_df: Union[pd.DataFrame, str],
features: List[str],
full_feature_names: bool = False,
) -> pd.DataFrame:
"""Returns the historical features for training or batch scoring.
Args:
entity_df: The entity DataFrame or entity name.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The historical features as a Pandas DataFrame.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return fs.get_historical_features(
entity_df=entity_df,
features=features,
full_feature_names=full_feature_names,
).to_df()
get_online_features(self, entity_rows, features, full_feature_names=False)
Returns the latest online feature data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
entity_rows |
List[Dict[str, Any]] |
The entity rows to retrieve. |
required |
features |
List[str] |
The features to retrieve. |
required |
full_feature_names |
bool |
Whether to return the full feature names. |
False |
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
Dict[str, Any] |
The latest online feature data as a dictionary. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_online_features(
self,
entity_rows: List[Dict[str, Any]],
features: List[str],
full_feature_names: bool = False,
) -> Dict[str, Any]:
"""Returns the latest online feature data.
Args:
entity_rows: The entity rows to retrieve.
features: The features to retrieve.
full_feature_names: Whether to return the full feature names.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The latest online feature data as a dictionary.
"""
self._validate_connection()
fs = FeatureStore(repo_path=self.config.feast_repo)
return fs.get_online_features( # type: ignore[no-any-return]
entity_rows=entity_rows,
features=features,
full_feature_names=full_feature_names,
).to_dict()
get_project(self)
Returns the project name.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
str |
The project name. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_project(self) -> str:
"""Returns the project name.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The project name.
"""
fs = FeatureStore(repo_path=self.config.feast_repo)
return str(fs.project)
get_registry(self)
Returns the feature store registry.
Exceptions:
Type | Description |
---|---|
ConnectionError |
If the online component (Redis) is not available. |
Returns:
Type | Description |
---|---|
Registry |
The registry. |
Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_registry(self) -> Registry:
"""Returns the feature store registry.
Raise:
ConnectionError: If the online component (Redis) is not available.
Returns:
The registry.
"""
fs: FeatureStore = FeatureStore(repo_path=self.config.feast_repo)
return fs.registry
flavors
special
Feast integration flavors.
feast_feature_store_flavor
Feast feature store flavor.
FeastFeatureStoreConfig (BaseFeatureStoreConfig)
pydantic-model
Config for Feast feature store.
Source code in zenml/integrations/feast/flavors/feast_feature_store_flavor.py
class FeastFeatureStoreConfig(BaseFeatureStoreConfig):
"""Config for Feast feature store."""
online_host: str = "localhost"
online_port: int = 6379
feast_repo: str
@property
def is_local(self) -> bool:
"""Checks if this stack component is running locally.
This designation is used to determine if the stack component can be
shared with other users or if it is only usable on the local host.
Returns:
True if this config is for a local component, False otherwise.
"""
return (
self.online_host == "localhost" or self.online_host == "127.0.0.1"
)
is_local: bool
property
readonly
Checks if this stack component is running locally.
This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.
Returns:
Type | Description |
---|---|
bool |
True if this config is for a local component, False otherwise. |
FeastFeatureStoreFlavor (BaseFeatureStoreFlavor)
Feast Feature store flavor.
Source code in zenml/integrations/feast/flavors/feast_feature_store_flavor.py
class FeastFeatureStoreFlavor(BaseFeatureStoreFlavor):
"""Feast Feature store flavor."""
@property
def name(self) -> str:
"""Name of the flavor.
Returns:
The name of the flavor.
"""
return FEAST_FEATURE_STORE_FLAVOR
@property
def config_class(self) -> Type[FeastFeatureStoreConfig]:
"""Returns FeastFeatureStoreConfig config class.
Returns:
The config class.
"""
"""Config class for this flavor."""
return FeastFeatureStoreConfig
@property
def implementation_class(self) -> Type["FeastFeatureStore"]:
"""Implementation class for this flavor.
Returns:
The implementation class.
"""
from zenml.integrations.feast.feature_stores import FeastFeatureStore
return FeastFeatureStore
config_class: Type[zenml.integrations.feast.flavors.feast_feature_store_flavor.FeastFeatureStoreConfig]
property
readonly
Returns FeastFeatureStoreConfig config class.
Returns:
Type | Description |
---|---|
Type[zenml.integrations.feast.flavors.feast_feature_store_flavor.FeastFeatureStoreConfig] |
The config class. |
implementation_class: Type[FeastFeatureStore]
property
readonly
Implementation class for this flavor.
Returns:
Type | Description |
---|---|
Type[FeastFeatureStore] |
The implementation class. |
name: str
property
readonly
Name of the flavor.
Returns:
Type | Description |
---|---|
str |
The name of the flavor. |