diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index 010810f6cc..35d7be7821 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -15,8 +15,9 @@ from __future__ import annotations import warnings +from typing import List -from hsfs import engine, util +from hsfs import engine, feature, util from hsfs import feature_group as fg from hsfs.client import exceptions from hsfs.core import delta_engine, feature_group_base_engine, hudi_engine @@ -30,6 +31,40 @@ def __init__(self, feature_store_id): # cache online feature store connector self._online_conn = None + def _update_feature_group_schema_on_demand_transformations( + self, feature_group: fg.FeatureGroup, features: List[feature.Feature] + ): + """ + Function to update feature group schema based on the on demand transformation available in the feature group. + + # Arguments: + feature_group: fg.FeatureGroup. The feature group for which the schema has to be updated. + features: List[feature.Feature]. List of features currently in the feature group + # Returns: + Updated list of features. That has on-demand features and removes dropped features. + """ + if not feature_group.transformation_functions: + return features + else: + transformed_features = [] + dropped_features = [] + for tf in feature_group.transformation_functions: + transformed_features.append( + feature.Feature( + tf.hopsworks_udf.output_column_names[0], + tf.hopsworks_udf.return_types[0], + on_demand=True, + ) + ) + if tf.hopsworks_udf.dropped_features: + dropped_features.extend(tf.hopsworks_udf.dropped_features) + updated_schema = [] + + for feat in features: + if feat.name not in dropped_features: + updated_schema.append(feat) + return updated_schema + transformed_features + def save( self, feature_group, @@ -40,6 +75,11 @@ def save( dataframe_features = engine.get_instance().parse_schema_feature_group( feature_dataframe, feature_group.time_travel_format ) + dataframe_features = ( + self._update_feature_group_schema_on_demand_transformations( + feature_group=feature_group, features=dataframe_features + ) + ) util.validate_embedding_feature_type( feature_group.embedding_index, dataframe_features ) @@ -88,9 +128,12 @@ def insert( validation_options: dict = None, ): dataframe_features = engine.get_instance().parse_schema_feature_group( - feature_dataframe, - feature_group.time_travel_format, - feature_group.transformation_functions, + feature_dataframe, feature_group.time_travel_format + ) + dataframe_features = ( + self._update_feature_group_schema_on_demand_transformations( + feature_group=feature_group, features=dataframe_features + ) ) util.validate_embedding_feature_type( feature_group.embedding_index, dataframe_features @@ -283,9 +326,12 @@ def insert_stream( ) dataframe_features = engine.get_instance().parse_schema_feature_group( - dataframe, - feature_group.time_travel_format, - feature_group.transformation_functions, + dataframe, feature_group.time_travel_format + ) + dataframe_features = ( + self._update_feature_group_schema_on_demand_transformations( + feature_group=feature_group, features=dataframe_features + ) ) util.validate_embedding_feature_type( feature_group.embedding_index, dataframe_features diff --git a/python/hsfs/core/vector_server.py b/python/hsfs/core/vector_server.py index 407f9aa1c4..598a9cfc07 100755 --- a/python/hsfs/core/vector_server.py +++ b/python/hsfs/core/vector_server.py @@ -577,7 +577,9 @@ def _check_feature_vectors_type_and_convert_to_dict( feature_vectors = feature_vectors.to_dict(orient="records") elif isinstance(feature_vectors, list) and feature_vectors: - if isinstance(feature_vectors[0], list): + if all( + isinstance(feature_vector, list) for feature_vector in feature_vectors + ): return_type = "list" feature_vectors = [ self.get_untransformed_features_map(feature_vector) @@ -610,7 +612,7 @@ def transform( if not self._model_dependent_transformation_functions: warnings.warn( "Feature view does not have any attached model-dependent transformations. Returning input feature vectors.", - stacklevel=1, + stacklevel=0, ) return feature_vectors diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index b6524ea0e7..034a724a72 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -781,9 +781,6 @@ def parse_schema_feature_group( self, dataframe: Union[pd.DataFrame, pl.DataFrame], time_travel_format: Optional[str] = None, - transformation_functions: Optional[ - List[transformation_function.TransformationFunction] - ] = None, ) -> List[feature.Feature]: if isinstance(dataframe, pd.DataFrame): arrow_schema = pa.Schema.from_pandas(dataframe, preserve_index=False) @@ -792,20 +789,6 @@ def parse_schema_feature_group( ): arrow_schema = dataframe.to_arrow().schema features = [] - transformed_features = [] - dropped_features = [] - - if transformation_functions: - for tf in transformation_functions: - transformed_features.append( - feature.Feature( - tf.hopsworks_udf.output_column_names[0], - tf.hopsworks_udf.return_types[0], - on_demand=True, - ) - ) - if tf.hopsworks_udf.dropped_features: - dropped_features.extend(tf.hopsworks_udf.dropped_features) for feat_name in arrow_schema.names: name = util.autofix_feature_name(feat_name) try: @@ -814,10 +797,9 @@ def parse_schema_feature_group( ) except ValueError as e: raise FeatureStoreException(f"Feature '{name}': {str(e)}") from e - if name not in dropped_features: - features.append(feature.Feature(name, converted_type)) + features.append(feature.Feature(name, converted_type)) - return features + transformed_features + return features def parse_schema_training_dataset( self, dataframe: Union[pd.DataFrame, pl.DataFrame] diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index 619686b6f9..ae41382eac 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -1150,25 +1150,8 @@ def parse_schema_feature_group( self, dataframe, time_travel_format=None, - transformation_functions: Optional[ - List[transformation_function.TransformationFunction] - ] = None, ): features = [] - transformed_features = [] - dropped_features = [] - - if transformation_functions: - for tf in transformation_functions: - transformed_features.append( - feature.Feature( - tf.hopsworks_udf.output_column_names[0], - tf.hopsworks_udf.return_types[0], - on_demand=True, - ) - ) - if tf.hopsworks_udf.dropped_features: - dropped_features.extend(tf.hopsworks_udf.dropped_features) using_hudi = time_travel_format == "HUDI" for feat in dataframe.schema: @@ -1179,13 +1162,12 @@ def parse_schema_feature_group( ) except ValueError as e: raise FeatureStoreException(f"Feature '{feat.name}': {str(e)}") from e - if name not in dropped_features: - features.append( - feature.Feature( - name, converted_type, feat.metadata.get("description", None) - ) + features.append( + feature.Feature( + name, converted_type, feat.metadata.get("description", None) ) - return features + transformed_features + ) + return features def parse_schema_training_dataset(self, dataframe): return [ diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index eb52e9e473..9efb476fe4 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -2570,6 +2570,11 @@ def save( # This is consistent with the behavior of the insert method where the feature list wins over the # dataframe structure self._features = self._features if len(self._features) > 0 else features + + self._features = self._feature_group_engine._update_feature_group_schema_on_demand_transformations( + self, self._features + ) + self._feature_group_engine.save_feature_group_metadata( self, None, write_options or {} ) diff --git a/python/hsfs/hopsworks_udf.py b/python/hsfs/hopsworks_udf.py index 697eb06f38..01952eb8ea 100644 --- a/python/hsfs/hopsworks_udf.py +++ b/python/hsfs/hopsworks_udf.py @@ -115,17 +115,28 @@ class HopsworksUdf: The class generates uses the metadata to dynamically generate user defined functions based on the engine it is executed in. + Arguments + --------- + func : `Union[Callable, str]`. The transformation function object or the source code of the transformation function. + return_types : `Union[List[type], type, List[str], str]`. A python type or a list of python types that denotes the data types of the columns output from the transformation functions. + name : `Optional[str]`. Name of the transformation function. + transformation_features : `Optional[List[TransformationFeature]]`. A list of objects of `TransformationFeature` that maps the feature used for transformation to their corresponding statistics argument names if any + transformation_function_argument_names : `Optional[List[TransformationFeature]]`. The argument names of the transformation function. + dropped_argument_names : `Optional[List[str]]`. The arguments to be dropped from the finial DataFrame after the transformation functions are applied. + dropped_feature_names : `Optional[List[str]]`. The feature name corresponding to the arguments names that are dropped + feature_name_prefix: `Optional[str]` = None. Prefixes if any used in the feature view. + Attributes ---------- - function_name (str) : Name of the UDF - udf_type (UDFType): Type of the UDF can be either \"model dependent\" or \"on-demand\". - return_types (List[str]): The data types of the columns returned from the UDF. - transformation_features (List[str]) : List of feature names to which the transformation function would be applied. - output_column_names (List[str]): Column names of the DataFrame returned after application of the transformation function. - dropped_features (List[str]): List of features that will be dropped after the UDF is applied. - transformation_statistics (Dict[str, FeatureDescriptiveStatistics]): Dictionary that maps the statistics_argument name in the function to the actual statistics variable. - statistics_required (bool) : True if statistics is required for any of the parameters of the UDF. - statistics_features (List[str]) : List of feature names that requires statistics. + function_name `str` : Name of the UDF + udf_type `UDFType`: Type of the UDF can be either \"model dependent\" or \"on-demand\". + return_types `List[str]`: The data types of the columns returned from the UDF. + transformation_features `List[str]` : List of feature names to which the transformation function would be applied. + output_column_names `List[str]`: Column names of the DataFrame returned after application of the transformation function. + dropped_features `List[str]`: List of features that will be dropped after the UDF is applied. + transformation_statistics `Dict[str, FeatureDescriptiveStatistics]`: Dictionary that maps the statistics_argument name in the function to the actual statistics variable. + statistics_required `bool` : True if statistics is required for any of the parameters of the UDF. + statistics_features `List[str]` : List of feature names that requires statistics. """ # Mapping for converting python types to spark types - required for creating pandas UDF's. diff --git a/python/tests/core/test_feature_group_engine.py b/python/tests/core/test_feature_group_engine.py index b6542eb46b..2847d5219d 100644 --- a/python/tests/core/test_feature_group_engine.py +++ b/python/tests/core/test_feature_group_engine.py @@ -18,6 +18,7 @@ from hsfs import feature, feature_group, feature_group_commit, validation_report from hsfs.client import exceptions from hsfs.core import feature_group_engine +from hsfs.hopsworks_udf import udf class TestFeatureGroupEngine: @@ -1356,3 +1357,86 @@ def test_save_feature_group_metadata_write_options(self, mocker): ] == "Feature Group created successfully, explore it at \n{}".format( feature_group_url ) + + def test_update_feature_group_schema_on_demand_transformations(self, mocker): + # Arrange + feature_store_id = 99 + + mocker.patch("hsfs.engine.get_type") + mocker.patch("hsfs.engine.get_instance") + mocker.patch( + "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata" + ) + mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine") + + @udf(int) + def test(feature): + return feature + 1 + + fg_engine = feature_group_engine.FeatureGroupEngine( + feature_store_id=feature_store_id + ) + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + partition_key=[], + id=10, + transformation_functions=[test("col2")], + ) + f = feature.Feature(name="col1", type="str") + f1 = feature.Feature(name="col2", type="str") + + # Act + result = fg_engine._update_feature_group_schema_on_demand_transformations( + feature_group=fg, features=[f, f1] + ) + + # Assert + assert len(result) == 3 + assert result[0].name == "col1" + assert result[1].name == "col2" + assert result[2].name == "test" + + def test_update_feature_group_schema_on_demand_transformations_drop(self, mocker): + # Arrange + feature_store_id = 99 + + mocker.patch("hsfs.engine.get_type") + mocker.patch("hsfs.engine.get_instance") + mocker.patch( + "hsfs.core.feature_group_engine.FeatureGroupEngine.save_feature_group_metadata" + ) + mocker.patch("hsfs.core.great_expectation_engine.GreatExpectationEngine") + + @udf(int, drop="feature") + def test(feature): + return feature + 1 + + fg_engine = feature_group_engine.FeatureGroupEngine( + feature_store_id=feature_store_id + ) + + fg = feature_group.FeatureGroup( + name="test", + version=1, + featurestore_id=feature_store_id, + primary_key=[], + partition_key=[], + id=10, + transformation_functions=[test("col2")], + ) + f = feature.Feature(name="col1", type="str") + f1 = feature.Feature(name="col2", type="str") + + # Act + result = fg_engine._update_feature_group_schema_on_demand_transformations( + feature_group=fg, features=[f, f1] + ) + + # Assert + assert len(result) == 2 + assert result[0].name == "col1" + assert result[1].name == "test" diff --git a/python/tests/engine/test_python.py b/python/tests/engine/test_python.py index a000359c32..a08c6e8ec4 100644 --- a/python/tests/engine/test_python.py +++ b/python/tests/engine/test_python.py @@ -37,9 +37,8 @@ from hsfs.core.constants import HAS_GREAT_EXPECTATIONS from hsfs.engine import python from hsfs.expectation_suite import ExpectationSuite -from hsfs.hopsworks_udf import UDFType, udf +from hsfs.hopsworks_udf import udf from hsfs.training_dataset_feature import TrainingDatasetFeature -from hsfs.transformation_function import TransformationFunction from polars.testing import assert_frame_equal as polars_assert_frame_equal @@ -1456,71 +1455,6 @@ def test_parse_schema_feature_group_polars(self, mocker): assert result[1].name == "col2" assert result[2].name == "date" - def test_parse_schema_feature_group_transformation_functions(self, mocker): - # Arrange - mocker.patch("hsfs.engine.python.Engine._convert_pandas_dtype_to_offline_type") - - python_engine = python.Engine() - - d = {"Col1": [1, 2], "col2": [3, 4]} - df = pd.DataFrame(data=d) - - @udf(int) - def test(feature): - return feature + 1 - - transformation_function = TransformationFunction( - featurestore_id=10, - hopsworks_udf=test, - version=1, - transformation_type=UDFType.ON_DEMAND, - ) - - # Act - result = python_engine.parse_schema_feature_group( - dataframe=df, - time_travel_format=None, - transformation_functions=[transformation_function], - ) - - # Assert - assert len(result) == 3 - assert result[0].name == "col1" - assert result[1].name == "col2" - assert result[2].name == "test" - - def test_parse_schema_feature_group_transformation_functions_drop(self, mocker): - # Arrange - mocker.patch("hsfs.engine.python.Engine._convert_pandas_dtype_to_offline_type") - - python_engine = python.Engine() - - d = {"Col1": [1, 2], "col2": [3, 4]} - df = pd.DataFrame(data=d) - - @udf(int, drop="feature") - def test(feature): - return feature + 1 - - transformation_function = TransformationFunction( - featurestore_id=10, - hopsworks_udf=test("col2"), - version=1, - transformation_type=UDFType.ON_DEMAND, - ) - - # Act - result = python_engine.parse_schema_feature_group( - dataframe=df, - time_travel_format=None, - transformation_functions=[transformation_function], - ) - - # Assert - assert len(result) == 2 - assert result[0].name == "col1" - assert result[1].name == "test" - def test_parse_schema_training_dataset(self): # Arrange python_engine = python.Engine() diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py index 9b5ad75ed4..9b4ff6b2de 100644 --- a/python/tests/engine/test_spark.py +++ b/python/tests/engine/test_spark.py @@ -3889,81 +3889,6 @@ def test_parse_schema_feature_group(self, mocker): assert mock_spark_engine_convert_spark_type.call_count == 2 assert mock_spark_engine_convert_spark_type.call_args[0][1] is False - def test_parse_schema_feature_group_transformations(self, mocker): - # Arrange - mock_spark_engine_convert_spark_type = mocker.patch( - "hsfs.engine.spark.Engine.convert_spark_type_to_offline_type" - ) - - spark_engine = spark.Engine() - - d = {"col_0": [1, 2], "col_1": ["test_1", "test_2"]} - df = pd.DataFrame(data=d) - - @udf(int) - def test(feature): - return feature + 1 - - tf_function = transformation_function.TransformationFunction( - featurestore_id=10, - hopsworks_udf=test, - version=1, - transformation_type=UDFType.ON_DEMAND, - ) - - spark_df = spark_engine._spark_session.createDataFrame(df) - - # Act - result = spark_engine.parse_schema_feature_group( - dataframe=spark_df, - time_travel_format=None, - transformation_functions=[tf_function], - ) - - # Assert - assert result[0].name == "col_0" - assert result[1].name == "col_1" - assert result[2].name == "test" - assert mock_spark_engine_convert_spark_type.call_count == 2 - assert mock_spark_engine_convert_spark_type.call_args[0][1] is False - - def test_parse_schema_feature_group_transformations_dropped(self, mocker): - # Arrange - mock_spark_engine_convert_spark_type = mocker.patch( - "hsfs.engine.spark.Engine.convert_spark_type_to_offline_type" - ) - - spark_engine = spark.Engine() - - d = {"col_0": [1, 2], "col_1": ["test_1", "test_2"]} - df = pd.DataFrame(data=d) - - @udf(int, drop="feature") - def test(feature): - return feature + 1 - - tf_function = transformation_function.TransformationFunction( - featurestore_id=10, - hopsworks_udf=test("col_0"), - version=1, - transformation_type=UDFType.ON_DEMAND, - ) - - spark_df = spark_engine._spark_session.createDataFrame(df) - - # Act - result = spark_engine.parse_schema_feature_group( - dataframe=spark_df, - time_travel_format=None, - transformation_functions=[tf_function], - ) - - # Assert - assert result[0].name == "col_1" - assert result[1].name == "test" - assert mock_spark_engine_convert_spark_type.call_count == 2 - assert mock_spark_engine_convert_spark_type.call_args[0][1] is False - def test_parse_schema_feature_group_hudi(self, mocker): # Arrange mock_spark_engine_convert_spark_type = mocker.patch(