diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py index 92edc91cf3..7defbb6f10 100644 --- a/python/hsfs/core/delta_engine.py +++ b/python/hsfs/core/delta_engine.py @@ -172,6 +172,8 @@ def _generate_merge_query(self, source_alias, updates_alias): @staticmethod def _get_last_commit_metadata(spark_context, base_path): fg_source_table = DeltaTable.forPath(spark_context, base_path) + + # Get info about the latest commit last_commit = fg_source_table.history(1).first().asDict() version = last_commit["version"] commit_timestamp = util.convert_event_time_to_timestamp( @@ -180,6 +182,12 @@ def _get_last_commit_metadata(spark_context, base_path): commit_date_string = util.get_hudi_datestr_from_timestamp(commit_timestamp) operation_metrics = last_commit["operationMetrics"] + # Get info about the oldest remaining commit + oldest_commit = fg_source_table.history().orderBy("version").first().asDict() + oldest_commit_timestamp = util.convert_event_time_to_timestamp( + oldest_commit["timestamp"] + ) + if version == 0: fg_commit = feature_group_commit.FeatureGroupCommit( commitid=None, @@ -188,7 +196,7 @@ def _get_last_commit_metadata(spark_context, base_path): rows_inserted=operation_metrics["numOutputRows"], rows_updated=0, rows_deleted=0, - last_active_commit_time=commit_timestamp, + last_active_commit_time=oldest_commit_timestamp, ) else: fg_commit = feature_group_commit.FeatureGroupCommit( @@ -198,7 +206,7 @@ def _get_last_commit_metadata(spark_context, base_path): rows_inserted=operation_metrics["numTargetRowsInserted"], rows_updated=operation_metrics["numTargetRowsUpdated"], rows_deleted=operation_metrics["numTargetRowsDeleted"], - last_active_commit_time=commit_timestamp, + last_active_commit_time=oldest_commit_timestamp, ) return fg_commit