From 3bcaebc83522317febfce486b64275e5e57fc69c Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Mon, 11 Mar 2024 14:18:10 -0700 Subject: [PATCH] Rename lastCommitTimestamp to lastCommitFileModificationTimestamp --- .../scala/org/apache/spark/sql/delta/Snapshot.scala | 3 ++- .../apache/spark/sql/delta/SnapshotManagement.scala | 12 +++++++----- .../org/apache/spark/sql/delta/DeltaLogSuite.scala | 2 +- .../spark/sql/delta/InCommitTimestampSuite.scala | 2 +- .../spark/sql/delta/OptimisticTransactionSuite.scala | 4 ++-- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index fded328f95b..073c3cc3aa3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -105,7 +105,8 @@ class Snapshot( * is retrieved from the CommitInfo of the latest commit which * can result in an IO operation. */ - def timestamp: Long = getInCommitTimestampOpt.getOrElse(logSegment.lastCommitTimestamp) + def timestamp: Long = + getInCommitTimestampOpt.getOrElse(logSegment.lastCommitFileModificationTimestamp) /** * Returns the inCommitTimestamp if ICT is enabled, otherwise returns None. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index bba0b9eee27..c9fe365fec2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -1136,7 +1136,7 @@ object SnapshotManagement { oldLogSegment.copy( version = committedVersion, deltas = oldLogSegment.deltas :+ commitFileStatus, - lastCommitTimestamp = commitFileStatus.getModificationTime) + lastCommitFileModificationTimestamp = commitFileStatus.getModificationTime) } } @@ -1186,7 +1186,7 @@ object SerializableFileStatus { * @param version The Snapshot version to generate * @param deltas The delta commit files (.json) to read * @param checkpointProvider provider to give information about Checkpoint files. - * @param lastCommitTimestamp The "unadjusted" file modification timestamp of the + * @param lastCommitFileModificationTimestamp The "unadjusted" file modification timestamp of the * last commit within this segment. By unadjusted, we mean that the commit timestamps may * not necessarily be monotonically increasing for the commits within this segment. */ @@ -1195,9 +1195,10 @@ case class LogSegment( version: Long, deltas: Seq[FileStatus], checkpointProvider: UninitializedCheckpointProvider, - lastCommitTimestamp: Long) { + lastCommitFileModificationTimestamp: Long) { - override def hashCode(): Int = logPath.hashCode() * 31 + (lastCommitTimestamp % 10000).toInt + override def hashCode(): Int = + logPath.hashCode() * 31 + (lastCommitFileModificationTimestamp % 10000).toInt /** * An efficient way to check if a cached Snapshot's contents actually correspond to a new @@ -1206,7 +1207,8 @@ case class LogSegment( override def equals(obj: Any): Boolean = { obj match { case other: LogSegment => - version == other.version && lastCommitTimestamp == other.lastCommitTimestamp && + version == other.version && + lastCommitFileModificationTimestamp == other.lastCommitFileModificationTimestamp && logPath == other.logPath && checkpointProvider.version == other.checkpointProvider.version case _ => false } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala index 92220d878a2..d2859799a19 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaLogSuite.scala @@ -480,7 +480,7 @@ class DeltaLogSuite extends QueryTest // Store these for later usage val actions = deltaLog.snapshot.stateDS.collect() - val commitTimestamp = deltaLog.snapshot.logSegment.lastCommitTimestamp + val commitTimestamp = deltaLog.snapshot.logSegment.lastCommitFileModificationTimestamp checkAnswer( spark.read.format("delta").load(path), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index 3bd0f0a569b..7ddea2ace0e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -75,7 +75,7 @@ class InCommitTimestampSuite // File timestamp should be the same as snapshot.getTimestamp when inCommitTimestamp is not // enabled assert( - ver1Snapshot.logSegment.lastCommitTimestamp == ver1Snapshot.timestamp) + ver1Snapshot.logSegment.lastCommitFileModificationTimestamp == ver1Snapshot.timestamp) spark.sql(s"ALTER TABLE delta.`${tempDir.getAbsolutePath}`" + s"SET TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'true')") diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index 9a031c157e9..4ad0c521b29 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -432,7 +432,7 @@ class OptimisticTransactionSuite // preCommitLogSegment should not get updated until a commit is triggered assert(testTxn.preCommitLogSegment.version == 1) - assert(testTxn.preCommitLogSegment.lastCommitTimestamp < testTxnStartTs) + assert(testTxn.preCommitLogSegment.lastCommitFileModificationTimestamp < testTxnStartTs) assert(testTxn.preCommitLogSegment.deltas.size == 2) assert(testTxn.preCommitLogSegment.checkpointProvider.isEmpty) @@ -440,7 +440,7 @@ class OptimisticTransactionSuite // preCommitLogSegment should get updated to the version right before the txn commits assert(testTxn.preCommitLogSegment.version == 12) - assert(testTxn.preCommitLogSegment.lastCommitTimestamp < testTxnEndTs) + assert(testTxn.preCommitLogSegment.lastCommitFileModificationTimestamp < testTxnEndTs) assert(testTxn.preCommitLogSegment.deltas.size == 2) assert(testTxn.preCommitLogSegment.checkpointProvider.version == 10) }