Skip to content

Commit

Permalink
checking all element to check if multiple feature vectors are provide…
Browse files Browse the repository at this point in the history
…d and adapting changes for save functions feature group
  • Loading branch information
manu-sj committed Jul 12, 2024
1 parent ed8a6f3 commit 459d097
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 203 deletions.
60 changes: 53 additions & 7 deletions python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from __future__ import annotations

import warnings
from typing import List

from hsfs import engine, util
from hsfs import engine, feature, util
from hsfs import feature_group as fg
from hsfs.client import exceptions
from hsfs.core import delta_engine, feature_group_base_engine, hudi_engine
Expand All @@ -30,6 +31,40 @@ def __init__(self, feature_store_id):
# cache online feature store connector
self._online_conn = None

def _update_feature_group_schema_on_demand_transformations(
self, feature_group: fg.FeatureGroup, features: List[feature.Feature]
):
"""
Function to update feature group schema based on the on demand transformation available in the feature group.
# Arguments:
feature_group: fg.FeatureGroup. The feature group for which the schema has to be updated.
features: List[feature.Feature]. List of features currently in the feature group
# Returns:
Updated list of features. That has on-demand features and removes dropped features.
"""
if not feature_group.transformation_functions:
return features
else:
transformed_features = []
dropped_features = []
for tf in feature_group.transformation_functions:
transformed_features.append(
feature.Feature(
tf.hopsworks_udf.output_column_names[0],
tf.hopsworks_udf.return_types[0],
on_demand=True,
)
)
if tf.hopsworks_udf.dropped_features:
dropped_features.extend(tf.hopsworks_udf.dropped_features)
updated_schema = []

for feat in features:
if feat.name not in dropped_features:
updated_schema.append(feat)
return updated_schema + transformed_features

def save(
self,
feature_group,
Expand All @@ -40,6 +75,11 @@ def save(
dataframe_features = engine.get_instance().parse_schema_feature_group(
feature_dataframe, feature_group.time_travel_format
)
dataframe_features = (
self._update_feature_group_schema_on_demand_transformations(
feature_group=feature_group, features=dataframe_features
)
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
)
Expand Down Expand Up @@ -88,9 +128,12 @@ def insert(
validation_options: dict = None,
):
dataframe_features = engine.get_instance().parse_schema_feature_group(
feature_dataframe,
feature_group.time_travel_format,
feature_group.transformation_functions,
feature_dataframe, feature_group.time_travel_format
)
dataframe_features = (
self._update_feature_group_schema_on_demand_transformations(
feature_group=feature_group, features=dataframe_features
)
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
Expand Down Expand Up @@ -283,9 +326,12 @@ def insert_stream(
)

dataframe_features = engine.get_instance().parse_schema_feature_group(
dataframe,
feature_group.time_travel_format,
feature_group.transformation_functions,
dataframe, feature_group.time_travel_format
)
dataframe_features = (
self._update_feature_group_schema_on_demand_transformations(
feature_group=feature_group, features=dataframe_features
)
)
util.validate_embedding_feature_type(
feature_group.embedding_index, dataframe_features
Expand Down
6 changes: 4 additions & 2 deletions python/hsfs/core/vector_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,9 @@ def _check_feature_vectors_type_and_convert_to_dict(
feature_vectors = feature_vectors.to_dict(orient="records")

elif isinstance(feature_vectors, list) and feature_vectors:
if isinstance(feature_vectors[0], list):
if all(
isinstance(feature_vector, list) for feature_vector in feature_vectors
):
return_type = "list"
feature_vectors = [
self.get_untransformed_features_map(feature_vector)
Expand Down Expand Up @@ -610,7 +612,7 @@ def transform(
if not self._model_dependent_transformation_functions:
warnings.warn(
"Feature view does not have any attached model-dependent transformations. Returning input feature vectors.",
stacklevel=1,
stacklevel=0,
)
return feature_vectors

Expand Down
22 changes: 2 additions & 20 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,9 +781,6 @@ def parse_schema_feature_group(
self,
dataframe: Union[pd.DataFrame, pl.DataFrame],
time_travel_format: Optional[str] = None,
transformation_functions: Optional[
List[transformation_function.TransformationFunction]
] = None,
) -> List[feature.Feature]:
if isinstance(dataframe, pd.DataFrame):
arrow_schema = pa.Schema.from_pandas(dataframe, preserve_index=False)
Expand All @@ -792,20 +789,6 @@ def parse_schema_feature_group(
):
arrow_schema = dataframe.to_arrow().schema
features = []
transformed_features = []
dropped_features = []

if transformation_functions:
for tf in transformation_functions:
transformed_features.append(
feature.Feature(
tf.hopsworks_udf.output_column_names[0],
tf.hopsworks_udf.return_types[0],
on_demand=True,
)
)
if tf.hopsworks_udf.dropped_features:
dropped_features.extend(tf.hopsworks_udf.dropped_features)
for feat_name in arrow_schema.names:
name = util.autofix_feature_name(feat_name)
try:
Expand All @@ -814,10 +797,9 @@ def parse_schema_feature_group(
)
except ValueError as e:
raise FeatureStoreException(f"Feature '{name}': {str(e)}") from e
if name not in dropped_features:
features.append(feature.Feature(name, converted_type))
features.append(feature.Feature(name, converted_type))

return features + transformed_features
return features

def parse_schema_training_dataset(
self, dataframe: Union[pd.DataFrame, pl.DataFrame]
Expand Down
28 changes: 5 additions & 23 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -1150,25 +1150,8 @@ def parse_schema_feature_group(
self,
dataframe,
time_travel_format=None,
transformation_functions: Optional[
List[transformation_function.TransformationFunction]
] = None,
):
features = []
transformed_features = []
dropped_features = []

if transformation_functions:
for tf in transformation_functions:
transformed_features.append(
feature.Feature(
tf.hopsworks_udf.output_column_names[0],
tf.hopsworks_udf.return_types[0],
on_demand=True,
)
)
if tf.hopsworks_udf.dropped_features:
dropped_features.extend(tf.hopsworks_udf.dropped_features)

using_hudi = time_travel_format == "HUDI"
for feat in dataframe.schema:
Expand All @@ -1179,13 +1162,12 @@ def parse_schema_feature_group(
)
except ValueError as e:
raise FeatureStoreException(f"Feature '{feat.name}': {str(e)}") from e
if name not in dropped_features:
features.append(
feature.Feature(
name, converted_type, feat.metadata.get("description", None)
)
features.append(
feature.Feature(
name, converted_type, feat.metadata.get("description", None)
)
return features + transformed_features
)
return features

def parse_schema_training_dataset(self, dataframe):
return [
Expand Down
5 changes: 5 additions & 0 deletions python/hsfs/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -2570,6 +2570,11 @@ def save(
# This is consistent with the behavior of the insert method where the feature list wins over the
# dataframe structure
self._features = self._features if len(self._features) > 0 else features

self._features = self._feature_group_engine._update_feature_group_schema_on_demand_transformations(
self, self._features
)

self._feature_group_engine.save_feature_group_metadata(
self, None, write_options or {}
)
Expand Down
29 changes: 20 additions & 9 deletions python/hsfs/hopsworks_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,28 @@ class HopsworksUdf:
The class generates uses the metadata to dynamically generate user defined functions based on the
engine it is executed in.
Arguments
---------
func : `Union[Callable, str]`. The transformation function object or the source code of the transformation function.
return_types : `Union[List[type], type, List[str], str]`. A python type or a list of python types that denotes the data types of the columns output from the transformation functions.
name : `Optional[str]`. Name of the transformation function.
transformation_features : `Optional[List[TransformationFeature]]`. A list of objects of `TransformationFeature` that maps the feature used for transformation to their corresponding statistics argument names if any
transformation_function_argument_names : `Optional[List[TransformationFeature]]`. The argument names of the transformation function.
dropped_argument_names : `Optional[List[str]]`. The arguments to be dropped from the finial DataFrame after the transformation functions are applied.
dropped_feature_names : `Optional[List[str]]`. The feature name corresponding to the arguments names that are dropped
feature_name_prefix: `Optional[str]` = None. Prefixes if any used in the feature view.
Attributes
----------
function_name (str) : Name of the UDF
udf_type (UDFType): Type of the UDF can be either \"model dependent\" or \"on-demand\".
return_types (List[str]): The data types of the columns returned from the UDF.
transformation_features (List[str]) : List of feature names to which the transformation function would be applied.
output_column_names (List[str]): Column names of the DataFrame returned after application of the transformation function.
dropped_features (List[str]): List of features that will be dropped after the UDF is applied.
transformation_statistics (Dict[str, FeatureDescriptiveStatistics]): Dictionary that maps the statistics_argument name in the function to the actual statistics variable.
statistics_required (bool) : True if statistics is required for any of the parameters of the UDF.
statistics_features (List[str]) : List of feature names that requires statistics.
function_name `str` : Name of the UDF
udf_type `UDFType`: Type of the UDF can be either \"model dependent\" or \"on-demand\".
return_types `List[str]`: The data types of the columns returned from the UDF.
transformation_features `List[str]` : List of feature names to which the transformation function would be applied.
output_column_names `List[str]`: Column names of the DataFrame returned after application of the transformation function.
dropped_features `List[str]`: List of features that will be dropped after the UDF is applied.
transformation_statistics `Dict[str, FeatureDescriptiveStatistics]`: Dictionary that maps the statistics_argument name in the function to the actual statistics variable.
statistics_required `bool` : True if statistics is required for any of the parameters of the UDF.
statistics_features `List[str]` : List of feature names that requires statistics.
"""

# Mapping for converting python types to spark types - required for creating pandas UDF's.
Expand Down
84 changes: 84 additions & 0 deletions python/tests/core/test_feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from hsfs import feature, feature_group, feature_group_commit, validation_report
from hsfs.client import exceptions
from hsfs.core import feature_group_engine
from hsfs.hopsworks_udf import udf


class TestFeatureGroupEngine:
Expand Down Expand Up @@ -1356,3 +1357,86 @@ def test_save_feature_group_metadata_write_options(self, mocker):
] == "Feature Group created successfully, explore it at \n{}".format(
feature_group_url
)

def test_update_feature_group_schema_on_demand_transformations(self, mocker):
# Arrange
feature_store_id = 99

mocker.patch("hsfs.engine.get_type")
mocker.patch("hsfs.engine.get_instance")
mocker.patch(
"hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata"
)
mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine")

@udf(int)
def test(feature):
return feature + 1

fg_engine = feature_group_engine.FeatureGroupEngine(
feature_store_id=feature_store_id
)

fg = feature_group.FeatureGroup(
name="test",
version=1,
featurestore_id=feature_store_id,
primary_key=[],
partition_key=[],
id=10,
transformation_functions=[test("col2")],
)
f = feature.Feature(name="col1", type="str")
f1 = feature.Feature(name="col2", type="str")

# Act
result = fg_engine._update_feature_group_schema_on_demand_transformations(
feature_group=fg, features=[f, f1]
)

# Assert
assert len(result) == 3
assert result[0].name == "col1"
assert result[1].name == "col2"
assert result[2].name == "test"

def test_update_feature_group_schema_on_demand_transformations_drop(self, mocker):
# Arrange
feature_store_id = 99

mocker.patch("hsfs.engine.get_type")
mocker.patch("hsfs.engine.get_instance")
mocker.patch(
"hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata"
)
mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine")

@udf(int, drop="feature")
def test(feature):
return feature + 1

fg_engine = feature_group_engine.FeatureGroupEngine(
feature_store_id=feature_store_id
)

fg = feature_group.FeatureGroup(
name="test",
version=1,
featurestore_id=feature_store_id,
primary_key=[],
partition_key=[],
id=10,
transformation_functions=[test("col2")],
)
f = feature.Feature(name="col1", type="str")
f1 = feature.Feature(name="col2", type="str")

# Act
result = fg_engine._update_feature_group_schema_on_demand_transformations(
feature_group=fg, features=[f, f1]
)

# Assert
assert len(result) == 2
assert result[0].name == "col1"
assert result[1].name == "test"
Loading

0 comments on commit 459d097

Please sign in to comment.