Skip to content

Commit

Permalink
[FSTORE-1181] Helper columns should return all columns if they have d…
Browse files Browse the repository at this point in the history
…ifferent names across feature groups (#1203)

Helper columns should return all columns if they have different names across feature groups
  • Loading branch information
davitbzh authored Feb 16, 2024
1 parent 53bdddd commit 3283198
Showing 1 changed file with 37 additions and 2 deletions.
39 changes: 37 additions & 2 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def get_training_data(
with_primary_keys=primary_keys,
primary_keys=self._get_primary_keys_from_query(feature_view_obj.query),
with_event_time=event_time,
event_time=[feature_view_obj.query._left_feature_group.event_time],
event_time=self._get_eventtimes_from_query(feature_view_obj.query),
with_training_helper_columns=training_helper_columns,
training_helper_columns=feature_view_obj.training_helper_columns,
feature_view_features=[
Expand Down Expand Up @@ -708,6 +708,12 @@ def get_batch_data(
):
self._check_feature_group_accessibility(feature_view_obj)

# check if primary_keys/event_time are ambiguous
if primary_keys:
self._get_primary_keys_from_query(feature_view_obj.query)
if event_time:
self._get_eventtimes_from_query(feature_view_obj.query)

feature_dataframe = self.get_batch_query(
feature_view_obj,
start_time,
Expand Down Expand Up @@ -814,10 +820,39 @@ def _get_primary_keys_from_query(self, fv_query_obj):
for _join in fv_query_obj._joins:
fv_pks.update(
[
feature.name
self._check_if_exists(feature.name, fv_pks)
if _join.prefix is None
else _join.prefix + feature.name
for feature in _join.query._left_feature_group.features
if feature.primary
]
)

return list(fv_pks)

def _get_eventtimes_from_query(self, fv_query_obj):
fv_events = set()
if fv_query_obj._left_feature_group.event_time:
fv_events.update([fv_query_obj._left_feature_group.event_time])
for _join in fv_query_obj._joins:
if _join.query._left_feature_group.event_time:
fv_events.update(
[
self._check_if_exists(
_join.query._left_feature_group.event_time, fv_events
)
if _join.prefix is None
else _join.prefix + _join.query._left_feature_group.event_time
]
)

return list(fv_events)

def _check_if_exists(self, f_name, f_set):
if f_name in f_set:
raise FeatureStoreException(
f"Provided feature {f_name} is ambiguous and exists in more than one feature groups."
"To avoid this error specify prefix in the join."
)
else:
return f_name

0 comments on commit 3283198

Please sign in to comment.