Skip to content

Feast

zenml.integrations.feast special

Initialization for Feast integration.

The Feast integration offers a way to connect to a Feast Feature Store. ZenML implements a dedicated stack component that you can access as part of your ZenML steps in the usual ways.

FeastIntegration (Integration)

Definition of Feast integration for ZenML.

Source code in zenml/integrations/feast/__init__.py
class FeastIntegration(Integration):
    """Definition of Feast integration for ZenML."""

    NAME = FEAST
    REQUIREMENTS = ["feast[redis]~=0.26.0", "redis-server>=6.0.9"]

    @classmethod
    def flavors(cls) -> List[Type[Flavor]]:
        """Declare the stack component flavors for the Feast integration.

        Returns:
            List of stack component flavors for this integration.
        """
        from zenml.integrations.feast.flavors import FeastFeatureStoreFlavor

        return [FeastFeatureStoreFlavor]

flavors() classmethod

Declare the stack component flavors for the Feast integration.

Returns:

Type Description
List[Type[zenml.stack.flavor.Flavor]]

List of stack component flavors for this integration.

Source code in zenml/integrations/feast/__init__.py
@classmethod
def flavors(cls) -> List[Type[Flavor]]:
    """Declare the stack component flavors for the Feast integration.

    Returns:
        List of stack component flavors for this integration.
    """
    from zenml.integrations.feast.flavors import FeastFeatureStoreFlavor

    return [FeastFeatureStoreFlavor]

feature_stores special

Feast Feature Store integration for ZenML.

Feature stores allow data teams to serve data via an offline store and an online low-latency store where data is kept in sync between the two. It also offers a centralized registry where features (and feature schemas) are stored for use within a team or wider organization. Feature stores are a relatively recent addition to commonly-used machine learning stacks. Feast is a leading open-source feature store, first developed by Gojek in collaboration with Google.

feast_feature_store

Implementation of the Feast Feature Store for ZenML.

FeastFeatureStore (BaseFeatureStore)

Class to interact with the Feast feature store.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
class FeastFeatureStore(BaseFeatureStore):
    """Class to interact with the Feast feature store."""

    @property
    def config(self) -> FeastFeatureStoreConfig:
        """Returns the `FeastFeatureStoreConfig` config.

        Returns:
            The configuration.
        """
        return cast(FeastFeatureStoreConfig, self._config)

    def _validate_connection(self) -> None:
        """Validates the connection to the feature store.

        Raises:
            ConnectionError: If the online component (Redis) is not available.
        """
        client = redis.Redis(
            host=self.config.online_host, port=self.config.online_port
        )
        try:
            client.ping()
        except redis.exceptions.ConnectionError as e:
            raise redis.exceptions.ConnectionError(
                "Could not connect to feature store's online component. "
                "Please make sure that Redis is running."
            ) from e

    def get_historical_features(
        self,
        entity_df: Union[pd.DataFrame, str],
        features: List[str],
        full_feature_names: bool = False,
    ) -> pd.DataFrame:
        """Returns the historical features for training or batch scoring.

        Args:
            entity_df: The entity DataFrame or entity name.
            features: The features to retrieve.
            full_feature_names: Whether to return the full feature names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The historical features as a Pandas DataFrame.
        """
        fs = FeatureStore(repo_path=self.config.feast_repo)

        return fs.get_historical_features(
            entity_df=entity_df,
            features=features,
            full_feature_names=full_feature_names,
        ).to_df()

    def get_online_features(
        self,
        entity_rows: List[Dict[str, Any]],
        features: List[str],
        full_feature_names: bool = False,
    ) -> Dict[str, Any]:
        """Returns the latest online feature data.

        Args:
            entity_rows: The entity rows to retrieve.
            features: The features to retrieve.
            full_feature_names: Whether to return the full feature names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The latest online feature data as a dictionary.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.config.feast_repo)

        return fs.get_online_features(  # type: ignore[no-any-return]
            entity_rows=entity_rows,
            features=features,
            full_feature_names=full_feature_names,
        ).to_dict()

    def get_data_sources(self) -> List[str]:
        """Returns the data sources' names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The data sources' names.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.config.feast_repo)
        return [ds.name for ds in fs.list_data_sources()]

    def get_entities(self) -> List[str]:
        """Returns the entity names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The entity names.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.config.feast_repo)
        return [ds.name for ds in fs.list_entities()]

    def get_feature_services(self) -> List[str]:
        """Returns the feature service names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The feature service names.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.config.feast_repo)
        return [ds.name for ds in fs.list_feature_services()]

    def get_feature_views(self) -> List[str]:
        """Returns the feature view names.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The feature view names.
        """
        self._validate_connection()
        fs = FeatureStore(repo_path=self.config.feast_repo)
        return [ds.name for ds in fs.list_feature_views()]

    def get_project(self) -> str:
        """Returns the project name.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The project name.
        """
        fs = FeatureStore(repo_path=self.config.feast_repo)
        return str(fs.project)

    def get_registry(self) -> Registry:
        """Returns the feature store registry.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The registry.
        """
        fs: FeatureStore = FeatureStore(repo_path=self.config.feast_repo)
        return fs.registry

    def get_feast_version(self) -> str:
        """Returns the version of Feast used.

        Raise:
            ConnectionError: If the online component (Redis) is not available.

        Returns:
            The version of Feast currently being used.
        """
        fs = FeatureStore(repo_path=self.config.feast_repo)
        return str(fs.version())
config: FeastFeatureStoreConfig property readonly

Returns the FeastFeatureStoreConfig config.

Returns:

Type Description
FeastFeatureStoreConfig

The configuration.

get_data_sources(self)

Returns the data sources' names.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
List[str]

The data sources' names.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_data_sources(self) -> List[str]:
    """Returns the data sources' names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The data sources' names.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.config.feast_repo)
    return [ds.name for ds in fs.list_data_sources()]
get_entities(self)

Returns the entity names.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
List[str]

The entity names.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_entities(self) -> List[str]:
    """Returns the entity names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The entity names.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.config.feast_repo)
    return [ds.name for ds in fs.list_entities()]
get_feast_version(self)

Returns the version of Feast used.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
str

The version of Feast currently being used.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feast_version(self) -> str:
    """Returns the version of Feast used.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The version of Feast currently being used.
    """
    fs = FeatureStore(repo_path=self.config.feast_repo)
    return str(fs.version())
get_feature_services(self)

Returns the feature service names.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
List[str]

The feature service names.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_services(self) -> List[str]:
    """Returns the feature service names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The feature service names.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.config.feast_repo)
    return [ds.name for ds in fs.list_feature_services()]
get_feature_views(self)

Returns the feature view names.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
List[str]

The feature view names.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_feature_views(self) -> List[str]:
    """Returns the feature view names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The feature view names.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.config.feast_repo)
    return [ds.name for ds in fs.list_feature_views()]
get_historical_features(self, entity_df, features, full_feature_names=False)

Returns the historical features for training or batch scoring.

Parameters:

Name Type Description Default
entity_df Union[pandas.core.frame.DataFrame, str]

The entity DataFrame or entity name.

required
features List[str]

The features to retrieve.

required
full_feature_names bool

Whether to return the full feature names.

False

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
DataFrame

The historical features as a Pandas DataFrame.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_historical_features(
    self,
    entity_df: Union[pd.DataFrame, str],
    features: List[str],
    full_feature_names: bool = False,
) -> pd.DataFrame:
    """Returns the historical features for training or batch scoring.

    Args:
        entity_df: The entity DataFrame or entity name.
        features: The features to retrieve.
        full_feature_names: Whether to return the full feature names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The historical features as a Pandas DataFrame.
    """
    fs = FeatureStore(repo_path=self.config.feast_repo)

    return fs.get_historical_features(
        entity_df=entity_df,
        features=features,
        full_feature_names=full_feature_names,
    ).to_df()
get_online_features(self, entity_rows, features, full_feature_names=False)

Returns the latest online feature data.

Parameters:

Name Type Description Default
entity_rows List[Dict[str, Any]]

The entity rows to retrieve.

required
features List[str]

The features to retrieve.

required
full_feature_names bool

Whether to return the full feature names.

False

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
Dict[str, Any]

The latest online feature data as a dictionary.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_online_features(
    self,
    entity_rows: List[Dict[str, Any]],
    features: List[str],
    full_feature_names: bool = False,
) -> Dict[str, Any]:
    """Returns the latest online feature data.

    Args:
        entity_rows: The entity rows to retrieve.
        features: The features to retrieve.
        full_feature_names: Whether to return the full feature names.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The latest online feature data as a dictionary.
    """
    self._validate_connection()
    fs = FeatureStore(repo_path=self.config.feast_repo)

    return fs.get_online_features(  # type: ignore[no-any-return]
        entity_rows=entity_rows,
        features=features,
        full_feature_names=full_feature_names,
    ).to_dict()
get_project(self)

Returns the project name.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
str

The project name.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_project(self) -> str:
    """Returns the project name.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The project name.
    """
    fs = FeatureStore(repo_path=self.config.feast_repo)
    return str(fs.project)
get_registry(self)

Returns the feature store registry.

Exceptions:

Type Description
ConnectionError

If the online component (Redis) is not available.

Returns:

Type Description
Registry

The registry.

Source code in zenml/integrations/feast/feature_stores/feast_feature_store.py
def get_registry(self) -> Registry:
    """Returns the feature store registry.

    Raise:
        ConnectionError: If the online component (Redis) is not available.

    Returns:
        The registry.
    """
    fs: FeatureStore = FeatureStore(repo_path=self.config.feast_repo)
    return fs.registry

flavors special

Feast integration flavors.

feast_feature_store_flavor

Feast feature store flavor.

FeastFeatureStoreConfig (BaseFeatureStoreConfig) pydantic-model

Config for Feast feature store.

Source code in zenml/integrations/feast/flavors/feast_feature_store_flavor.py
class FeastFeatureStoreConfig(BaseFeatureStoreConfig):
    """Config for Feast feature store."""

    online_host: str = "localhost"
    online_port: int = 6379
    feast_repo: str

    @property
    def is_local(self) -> bool:
        """Checks if this stack component is running locally.

        This designation is used to determine if the stack component can be
        shared with other users or if it is only usable on the local host.

        Returns:
            True if this config is for a local component, False otherwise.
        """
        return (
            self.online_host == "localhost" or self.online_host == "127.0.0.1"
        )
is_local: bool property readonly

Checks if this stack component is running locally.

This designation is used to determine if the stack component can be shared with other users or if it is only usable on the local host.

Returns:

Type Description
bool

True if this config is for a local component, False otherwise.

FeastFeatureStoreFlavor (BaseFeatureStoreFlavor)

Feast Feature store flavor.

Source code in zenml/integrations/feast/flavors/feast_feature_store_flavor.py
class FeastFeatureStoreFlavor(BaseFeatureStoreFlavor):
    """Feast Feature store flavor."""

    @property
    def name(self) -> str:
        """Name of the flavor.

        Returns:
            The name of the flavor.
        """
        return FEAST_FEATURE_STORE_FLAVOR

    @property
    def config_class(self) -> Type[FeastFeatureStoreConfig]:
        """Returns FeastFeatureStoreConfig config class.

        Returns:
                The config class.
        """
        """Config class for this flavor."""
        return FeastFeatureStoreConfig

    @property
    def implementation_class(self) -> Type["FeastFeatureStore"]:
        """Implementation class for this flavor.

        Returns:
            The implementation class.
        """
        from zenml.integrations.feast.feature_stores import FeastFeatureStore

        return FeastFeatureStore
config_class: Type[zenml.integrations.feast.flavors.feast_feature_store_flavor.FeastFeatureStoreConfig] property readonly

Returns FeastFeatureStoreConfig config class.

Returns:

Type Description
Type[zenml.integrations.feast.flavors.feast_feature_store_flavor.FeastFeatureStoreConfig]

The config class.

implementation_class: Type[FeastFeatureStore] property readonly

Implementation class for this flavor.

Returns:

Type Description
Type[FeastFeatureStore]

The implementation class.

name: str property readonly

Name of the flavor.

Returns:

Type Description
str

The name of the flavor.