Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FSTORE-1181] Helper columns should return all columns if they have different names across feature groups #1203

Merged
merged 11 commits into from
Feb 16, 2024
39 changes: 37 additions & 2 deletions python/hsfs/core/feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def get_training_data(
with_primary_keys=primary_keys,
primary_keys=self._get_primary_keys_from_query(feature_view_obj.query),
with_event_time=event_time,
event_time=[feature_view_obj.query._left_feature_group.event_time],
event_time=self._get_eventtimes_from_query(feature_view_obj.query),
with_training_helper_columns=training_helper_columns,
training_helper_columns=feature_view_obj.training_helper_columns,
feature_view_features=[
Expand Down Expand Up @@ -708,6 +708,12 @@ def get_batch_data(
):
self._check_feature_group_accessibility(feature_view_obj)

# check if primary_keys/event_time are ambiguous
if primary_keys:
self._get_primary_keys_from_query(feature_view_obj.query)
if event_time:
self._get_eventtimes_from_query(feature_view_obj.query)

feature_dataframe = self.get_batch_query(
feature_view_obj,
start_time,
Expand Down Expand Up @@ -814,10 +820,39 @@ def _get_primary_keys_from_query(self, fv_query_obj):
for _join in fv_query_obj._joins:
fv_pks.update(
[
feature.name
self._check_if_exists(feature.name, fv_pks)
if _join.prefix is None
else _join.prefix + feature.name
for feature in _join.query._left_feature_group.features
if feature.primary
]
)

return list(fv_pks)

def _get_eventtimes_from_query(self, fv_query_obj):
fv_events = set()
if fv_query_obj._left_feature_group.event_time:
fv_events.update([fv_query_obj._left_feature_group.event_time])
for _join in fv_query_obj._joins:
if _join.query._left_feature_group.event_time:
fv_events.update(
[
self._check_if_exists(
_join.query._left_feature_group.event_time, fv_events
)
if _join.prefix is None
else _join.prefix + _join.query._left_feature_group.event_time
]
)

return list(fv_events)

def _check_if_exists(self, f_name, f_set):
if f_name in f_set:
raise FeatureStoreException(
f"Provided feature {f_name} is ambiguous and exists in more than one feature groups."
"To avoid this error specify prefix in the join."
)
else:
return f_name
Loading