Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable asynchronous deletes for Hudi #94

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ private[processing] object SparkUtils {
Resource
.make(openLogF >> buildF)(s => closeLogF >> Sync[F].blocking(s.close()))
.evalTap { session =>
Sync[F].delay {
// Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop
LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration)
}
if (writer.toleratesAsyncDelete) {
Sync[F].delay {
// Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop
LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration)
}
} else Sync[F].unit
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,11 @@ class DeltaWriter(config: Config.Delta) extends Writer {
}
}

/**
* Delta tolerates async deletes; in other words when we delete a file, there is no strong
* requirement that the file must be deleted immediately. Delta uses unique file names and never
* re-writes a file that was previously deleted
*/
override def toleratesAsyncDelete: Boolean = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,13 @@ class HudiWriter(config: Config.Hudi) extends Writer {
.options(config.hudiWriteOptions)
.save(config.location.toString)
}

/**
* Hudi cannot tolerate async deletes. When Hudi deletes a file, the file MUST be deleted
* immediately.
*
* In particular, the `hoodie.properties` file gets deleted and re-created by Hudi, and those
* steps must happen in order.
*/
override def toleratesAsyncDelete: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,10 @@ class IcebergWriter(config: Config.Iceberg) extends Writer {
}
.mkString(", ")

/**
* Iceberg tolerates async deletes; in other words when we delete a file, there is no strong
* requirement that the file must be deleted immediately. Iceberg uses unique file names and never
* re-writes a file that was previously deleted
*/
override def toleratesAsyncDelete: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ trait Writer {

/** Write Snowplow events into the table */
def write[F[_]: Sync](df: DataFrame): F[Unit]

/**
* Whether this lake format tolerates deletes to happen asynchronously instead of immediately
*
* If tolerated, then we use our customized `LakeLoaderFileSystem`.
*/
def toleratesAsyncDelete: Boolean
}
Loading