Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Rename lastCommitTimestamp to lastCommitFileModificationTimestamp #2746

Merged
merged 1 commit into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class Snapshot(
* is retrieved from the CommitInfo of the latest commit which
* can result in an IO operation.
*/
def timestamp: Long = getInCommitTimestampOpt.getOrElse(logSegment.lastCommitTimestamp)
def timestamp: Long =
getInCommitTimestampOpt.getOrElse(logSegment.lastCommitFileModificationTimestamp)

/**
* Returns the inCommitTimestamp if ICT is enabled, otherwise returns None.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1136,7 +1136,7 @@ object SnapshotManagement {
oldLogSegment.copy(
version = committedVersion,
deltas = oldLogSegment.deltas :+ commitFileStatus,
lastCommitTimestamp = commitFileStatus.getModificationTime)
lastCommitFileModificationTimestamp = commitFileStatus.getModificationTime)
}
}

Expand Down Expand Up @@ -1186,7 +1186,7 @@ object SerializableFileStatus {
* @param version The Snapshot version to generate
* @param deltas The delta commit files (.json) to read
* @param checkpointProvider provider to give information about Checkpoint files.
* @param lastCommitTimestamp The "unadjusted" file modification timestamp of the
* @param lastCommitFileModificationTimestamp The "unadjusted" file modification timestamp of the
* last commit within this segment. By unadjusted, we mean that the commit timestamps may
* not necessarily be monotonically increasing for the commits within this segment.
*/
Expand All @@ -1195,9 +1195,10 @@ case class LogSegment(
version: Long,
deltas: Seq[FileStatus],
checkpointProvider: UninitializedCheckpointProvider,
lastCommitTimestamp: Long) {
lastCommitFileModificationTimestamp: Long) {

override def hashCode(): Int = logPath.hashCode() * 31 + (lastCommitTimestamp % 10000).toInt
override def hashCode(): Int =
logPath.hashCode() * 31 + (lastCommitFileModificationTimestamp % 10000).toInt

/**
* An efficient way to check if a cached Snapshot's contents actually correspond to a new
Expand All @@ -1206,7 +1207,8 @@ case class LogSegment(
override def equals(obj: Any): Boolean = {
obj match {
case other: LogSegment =>
version == other.version && lastCommitTimestamp == other.lastCommitTimestamp &&
version == other.version &&
lastCommitFileModificationTimestamp == other.lastCommitFileModificationTimestamp &&
logPath == other.logPath && checkpointProvider.version == other.checkpointProvider.version
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ class DeltaLogSuite extends QueryTest

// Store these for later usage
val actions = deltaLog.snapshot.stateDS.collect()
val commitTimestamp = deltaLog.snapshot.logSegment.lastCommitTimestamp
val commitTimestamp = deltaLog.snapshot.logSegment.lastCommitFileModificationTimestamp

checkAnswer(
spark.read.format("delta").load(path),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class InCommitTimestampSuite
// File timestamp should be the same as snapshot.getTimestamp when inCommitTimestamp is not
// enabled
assert(
ver1Snapshot.logSegment.lastCommitTimestamp == ver1Snapshot.timestamp)
ver1Snapshot.logSegment.lastCommitFileModificationTimestamp == ver1Snapshot.timestamp)

spark.sql(s"ALTER TABLE delta.`${tempDir.getAbsolutePath}`" +
s"SET TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'true')")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,15 +432,15 @@ class OptimisticTransactionSuite

// preCommitLogSegment should not get updated until a commit is triggered
assert(testTxn.preCommitLogSegment.version == 1)
assert(testTxn.preCommitLogSegment.lastCommitTimestamp < testTxnStartTs)
assert(testTxn.preCommitLogSegment.lastCommitFileModificationTimestamp < testTxnStartTs)
assert(testTxn.preCommitLogSegment.deltas.size == 2)
assert(testTxn.preCommitLogSegment.checkpointProvider.isEmpty)

testTxn.commit(Seq.empty, ManualUpdate)

// preCommitLogSegment should get updated to the version right before the txn commits
assert(testTxn.preCommitLogSegment.version == 12)
assert(testTxn.preCommitLogSegment.lastCommitTimestamp < testTxnEndTs)
assert(testTxn.preCommitLogSegment.lastCommitFileModificationTimestamp < testTxnEndTs)
assert(testTxn.preCommitLogSegment.deltas.size == 2)
assert(testTxn.preCommitLogSegment.checkpointProvider.version == 10)
}
Expand Down
Loading