From 32831986a155f17c95257d38cb056c1082af428d Mon Sep 17 00:00:00 2001 From: davitbzh <44586065+davitbzh@users.noreply.github.com> Date: Fri, 16 Feb 2024 10:50:51 +0100 Subject: [PATCH] [FSTORE-1181] Helper columns should return all columns if they have different names across feature groups (#1203) Helper columns should return all columns if they have different names across feature groups --- python/hsfs/core/feature_view_engine.py | 39 +++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/python/hsfs/core/feature_view_engine.py b/python/hsfs/core/feature_view_engine.py index 7d3fddca41..010468269e 100644 --- a/python/hsfs/core/feature_view_engine.py +++ b/python/hsfs/core/feature_view_engine.py @@ -322,7 +322,7 @@ def get_training_data( with_primary_keys=primary_keys, primary_keys=self._get_primary_keys_from_query(feature_view_obj.query), with_event_time=event_time, - event_time=[feature_view_obj.query._left_feature_group.event_time], + event_time=self._get_eventtimes_from_query(feature_view_obj.query), with_training_helper_columns=training_helper_columns, training_helper_columns=feature_view_obj.training_helper_columns, feature_view_features=[ @@ -708,6 +708,12 @@ def get_batch_data( ): self._check_feature_group_accessibility(feature_view_obj) + # check if primary_keys/event_time are ambiguous + if primary_keys: + self._get_primary_keys_from_query(feature_view_obj.query) + if event_time: + self._get_eventtimes_from_query(feature_view_obj.query) + feature_dataframe = self.get_batch_query( feature_view_obj, start_time, @@ -814,10 +820,39 @@ def _get_primary_keys_from_query(self, fv_query_obj): for _join in fv_query_obj._joins: fv_pks.update( [ - feature.name + self._check_if_exists(feature.name, fv_pks) + if _join.prefix is None + else _join.prefix + feature.name for feature in _join.query._left_feature_group.features if feature.primary ] ) return list(fv_pks) + + def _get_eventtimes_from_query(self, fv_query_obj): + fv_events = set() + if fv_query_obj._left_feature_group.event_time: + fv_events.update([fv_query_obj._left_feature_group.event_time]) + for _join in fv_query_obj._joins: + if _join.query._left_feature_group.event_time: + fv_events.update( + [ + self._check_if_exists( + _join.query._left_feature_group.event_time, fv_events + ) + if _join.prefix is None + else _join.prefix + _join.query._left_feature_group.event_time + ] + ) + + return list(fv_events) + + def _check_if_exists(self, f_name, f_set): + if f_name in f_set: + raise FeatureStoreException( + f"Provided feature {f_name} is ambiguous and exists in more than one feature groups." + "To avoid this error specify prefix in the join." + ) + else: + return f_name