Skip to content

Commit b066aa9

Browse files
authored
[FSTORE-1411] On-Demand Transformation Functions (#1371)
* on-demand transformations * adding api for computing and transforming feature vectors * setting request parameters to {} if None * checking all element to check if multiple feature vectors are provided and adapting changes for save functions feature group
1 parent a4efba1 commit b066aa9

38 files changed

+2370
-497
lines changed

python/hsfs/builtin_transformations.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -23,26 +23,26 @@
2323
feature_statistics = TransformationStatistics("feature")
2424

2525

26-
@udf(float)
26+
@udf(float, drop=["feature"])
2727
def min_max_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
2828
return (feature - statistics.feature.min) / (
2929
statistics.feature.max - statistics.feature.min
3030
)
3131

3232

33-
@udf(float)
33+
@udf(float, drop=["feature"])
3434
def standard_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
3535
return (feature - statistics.feature.mean) / statistics.feature.stddev
3636

3737

38-
@udf(float)
38+
@udf(float, drop=["feature"])
3939
def robust_scaler(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
4040
return (feature - statistics.feature.percentiles[49]) / (
4141
statistics.feature.percentiles[74] - statistics.feature.percentiles[24]
4242
)
4343

4444

45-
@udf(int)
45+
@udf(int, drop=["feature"])
4646
def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
4747
unique_data = sorted(
4848
[value for value in statistics.feature.extended_statistics["unique_values"]]
@@ -53,7 +53,7 @@ def label_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Serie
5353
)
5454

5555

56-
@udf(bool)
56+
@udf(bool, drop=["feature"])
5757
def one_hot_encoder(feature: pd.Series, statistics=feature_statistics) -> pd.Series:
5858
unique_data = [
5959
value for value in statistics.feature.extended_statistics["unique_values"]

python/hsfs/core/feature_group_api.py

+13-3
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,17 @@ def save(
5151
feature_group_instance.feature_store_id,
5252
"featuregroups",
5353
]
54+
query_params = {
55+
"expand": ["features", "expectationsuite", "transformationfunctions"]
56+
}
5457
headers = {"content-type": "application/json"}
5558
feature_group_object = feature_group_instance.update_from_response_json(
5659
_client._send_request(
5760
"POST",
5861
path_params,
5962
headers=headers,
6063
data=feature_group_instance.json(),
64+
query_params=query_params,
6165
),
6266
)
6367
return feature_group_object
@@ -93,7 +97,11 @@ def get(
9397
"featuregroups",
9498
name,
9599
]
96-
query_params = None if version is None else {"version": version}
100+
query_params = {
101+
"expand": ["features", "expectationsuite", "transformationfunctions"]
102+
}
103+
if version is not None:
104+
query_params["version"] = version
97105

98106
fg_objs = []
99107
# In principle unique names are enforced across fg type and this should therefore
@@ -157,8 +165,10 @@ def get_by_id(
157165
"featuregroups",
158166
feature_group_id,
159167
]
160-
161-
fg_json = _client._send_request("GET", path_params)
168+
query_params = {
169+
"expand": ["features", "expectationsuite", "transformationfunctions"]
170+
}
171+
fg_json = _client._send_request("GET", path_params, query_params)
162172
if (
163173
fg_json["type"] == FeatureGroupApi.BACKEND_FG_STREAM
164174
or fg_json["type"] == FeatureGroupApi.BACKEND_FG_BATCH

python/hsfs/core/feature_group_engine.py

+51-1
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
from __future__ import annotations
1616

1717
import warnings
18+
from typing import List
1819

19-
from hsfs import engine, util
20+
from hsfs import engine, feature, util
2021
from hsfs import feature_group as fg
2122
from hsfs.client import exceptions
2223
from hsfs.core import delta_engine, feature_group_base_engine, hudi_engine
@@ -30,6 +31,40 @@ def __init__(self, feature_store_id):
3031
# cache online feature store connector
3132
self._online_conn = None
3233

34+
def _update_feature_group_schema_on_demand_transformations(
35+
self, feature_group: fg.FeatureGroup, features: List[feature.Feature]
36+
):
37+
"""
38+
Function to update feature group schema based on the on demand transformation available in the feature group.
39+
40+
# Arguments:
41+
feature_group: fg.FeatureGroup. The feature group for which the schema has to be updated.
42+
features: List[feature.Feature]. List of features currently in the feature group
43+
# Returns:
44+
Updated list of features. That has on-demand features and removes dropped features.
45+
"""
46+
if not feature_group.transformation_functions:
47+
return features
48+
else:
49+
transformed_features = []
50+
dropped_features = []
51+
for tf in feature_group.transformation_functions:
52+
transformed_features.append(
53+
feature.Feature(
54+
tf.hopsworks_udf.output_column_names[0],
55+
tf.hopsworks_udf.return_types[0],
56+
on_demand=True,
57+
)
58+
)
59+
if tf.hopsworks_udf.dropped_features:
60+
dropped_features.extend(tf.hopsworks_udf.dropped_features)
61+
updated_schema = []
62+
63+
for feat in features:
64+
if feat.name not in dropped_features:
65+
updated_schema.append(feat)
66+
return updated_schema + transformed_features
67+
3368
def save(
3469
self,
3570
feature_group,
@@ -40,6 +75,11 @@ def save(
4075
dataframe_features = engine.get_instance().parse_schema_feature_group(
4176
feature_dataframe, feature_group.time_travel_format
4277
)
78+
dataframe_features = (
79+
self._update_feature_group_schema_on_demand_transformations(
80+
feature_group=feature_group, features=dataframe_features
81+
)
82+
)
4383
util.validate_embedding_feature_type(
4484
feature_group.embedding_index, dataframe_features
4585
)
@@ -90,6 +130,11 @@ def insert(
90130
dataframe_features = engine.get_instance().parse_schema_feature_group(
91131
feature_dataframe, feature_group.time_travel_format
92132
)
133+
dataframe_features = (
134+
self._update_feature_group_schema_on_demand_transformations(
135+
feature_group=feature_group, features=dataframe_features
136+
)
137+
)
93138
util.validate_embedding_feature_type(
94139
feature_group.embedding_index, dataframe_features
95140
)
@@ -283,6 +328,11 @@ def insert_stream(
283328
dataframe_features = engine.get_instance().parse_schema_feature_group(
284329
dataframe, feature_group.time_travel_format
285330
)
331+
dataframe_features = (
332+
self._update_feature_group_schema_on_demand_transformations(
333+
feature_group=feature_group, features=dataframe_features
334+
)
335+
)
286336
util.validate_embedding_feature_type(
287337
feature_group.embedding_index, dataframe_features
288338
)

python/hsfs/core/feature_view_api.py

+11-29
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
from typing import List, Optional, Union
1919

20-
from hsfs import client, feature_view, training_dataset, transformation_function
20+
from hsfs import client, feature_view, training_dataset
2121
from hsfs.client.exceptions import RestAPIError
2222
from hsfs.constructor import query, serving_prepared_statement
2323
from hsfs.core import explicit_provenance, job, training_dataset_job_conf
@@ -46,7 +46,6 @@ class FeatureViewApi:
4646
_TRANSFORMED_lOG = "transformed"
4747
_UNTRANSFORMED_LOG = "untransformed"
4848

49-
5049
def __init__(self, feature_store_id: int) -> None:
5150
self._feature_store_id = feature_store_id
5251
self._client = client.get_instance()
@@ -214,28 +213,6 @@ def get_serving_prepared_statement(
214213
self._client._send_request("GET", path, query_params, headers=headers)
215214
)
216215

217-
def get_attached_transformation_fn(
218-
self, name: str, version: int
219-
) -> List["transformation_function.TransformationFunction"]:
220-
"""
221-
Get transformation functions attached to a feature view form the backend
222-
223-
# Arguments
224-
name `str`: Name of feature view.
225-
version `ìnt`: Version of feature view.
226-
227-
# Returns
228-
`List[TransformationFunction]` : List of transformation functions attached to the feature view.
229-
230-
# Raises
231-
`RestAPIError`: If the feature view cannot be found from the backend.
232-
`ValueError`: If the feature group associated with the feature view cannot be found.
233-
"""
234-
path = self._base_path + [name, self._VERSION, version, self._TRANSFORMATION]
235-
return transformation_function.TransformationFunction.from_response_json(
236-
self._client._send_request("GET", path)
237-
)
238-
239216
def create_training_dataset(
240217
self,
241218
name: str,
@@ -407,7 +384,8 @@ def get_models_provenance(
407384
def enable_feature_logging(
408385
self,
409386
feature_view_name: str,
410-
feature_view_version: int,):
387+
feature_view_version: int,
388+
):
411389
_client = client.get_instance()
412390
path_params = self._base_path + [
413391
feature_view_name,
@@ -420,7 +398,8 @@ def enable_feature_logging(
420398
def pause_feature_logging(
421399
self,
422400
feature_view_name: str,
423-
feature_view_version: int,):
401+
feature_view_version: int,
402+
):
424403
_client = client.get_instance()
425404
path_params = self._base_path + [
426405
feature_view_name,
@@ -434,7 +413,8 @@ def pause_feature_logging(
434413
def resume_feature_logging(
435414
self,
436415
feature_view_name: str,
437-
feature_view_version: int,):
416+
feature_view_version: int,
417+
):
438418
_client = client.get_instance()
439419
path_params = self._base_path + [
440420
feature_view_name,
@@ -448,7 +428,8 @@ def resume_feature_logging(
448428
def materialize_feature_logging(
449429
self,
450430
feature_view_name: str,
451-
feature_view_version: int,):
431+
feature_view_version: int,
432+
):
452433
_client = client.get_instance()
453434
path_params = self._base_path + [
454435
feature_view_name,
@@ -469,7 +450,8 @@ def materialize_feature_logging(
469450
def get_feature_logging(
470451
self,
471452
feature_view_name: str,
472-
feature_view_version: int,):
453+
feature_view_version: int,
454+
):
473455
_client = client.get_instance()
474456
path_params = self._base_path + [
475457
feature_view_name,

0 commit comments

Comments
 (0)