Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
bubriks committed Sep 26, 2024
1 parent 029def9 commit 006c5b5
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions python/hsfs/core/delta_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ def _generate_merge_query(self, source_alias, updates_alias):
@staticmethod
def _get_last_commit_metadata(spark_context, base_path):
fg_source_table = DeltaTable.forPath(spark_context, base_path)

# Get info about the latest commit
last_commit = fg_source_table.history(1).first().asDict()
version = last_commit["version"]
commit_timestamp = util.convert_event_time_to_timestamp(
Expand All @@ -180,6 +182,12 @@ def _get_last_commit_metadata(spark_context, base_path):
commit_date_string = util.get_hudi_datestr_from_timestamp(commit_timestamp)
operation_metrics = last_commit["operationMetrics"]

# Get info about the oldest remaining commit
oldest_commit = fg_source_table.history().orderBy("version").first().asDict()
oldest_commit_timestamp = util.convert_event_time_to_timestamp(
oldest_commit["timestamp"]
)

if version == 0:
fg_commit = feature_group_commit.FeatureGroupCommit(
commitid=None,
Expand All @@ -188,7 +196,7 @@ def _get_last_commit_metadata(spark_context, base_path):
rows_inserted=operation_metrics["numOutputRows"],
rows_updated=0,
rows_deleted=0,
last_active_commit_time=commit_timestamp,
last_active_commit_time=oldest_commit_timestamp,
)
else:
fg_commit = feature_group_commit.FeatureGroupCommit(
Expand All @@ -198,7 +206,7 @@ def _get_last_commit_metadata(spark_context, base_path):
rows_inserted=operation_metrics["numTargetRowsInserted"],
rows_updated=operation_metrics["numTargetRowsUpdated"],
rows_deleted=operation_metrics["numTargetRowsDeleted"],
last_active_commit_time=commit_timestamp,
last_active_commit_time=oldest_commit_timestamp,
)

return fg_commit

0 comments on commit 006c5b5

Please sign in to comment.