Skip to content

Commit c3c0b1b

Browse files
istreeteroguzhanunlu
authored andcommitted
Disable asynchronous deletes for Hudi (#94)
In #82 we added a feature to delete files asynchronously. This has worked great for Delta, which is tolerant to deletes that happen eventually not immediately. But it is not working great for Hudi, which does delete-and-replace of the `hoodie.properties` file. This commit disables the feature again for Hudi only.
1 parent d8bb275 commit c3c0b1b

File tree

5 files changed

+35
-4
lines changed

5 files changed

+35
-4
lines changed

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala

+6-4
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,12 @@ private[processing] object SparkUtils {
5151
Resource
5252
.make(openLogF >> buildF)(s => closeLogF >> Sync[F].blocking(s.close()))
5353
.evalTap { session =>
54-
Sync[F].delay {
55-
// Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop
56-
LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration)
57-
}
54+
if (writer.toleratesAsyncDelete) {
55+
Sync[F].delay {
56+
// Forces Spark to use `LakeLoaderFileSystem` when writing to the Lake via Hadoop
57+
LakeLoaderFileSystem.overrideHadoopFileSystemConf(targetLocation, session.sparkContext.hadoopConfiguration)
58+
}
59+
} else Sync[F].unit
5860
}
5961
}
6062

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/DeltaWriter.scala

+7
Original file line numberDiff line numberDiff line change
@@ -104,4 +104,11 @@ class DeltaWriter(config: Config.Delta) extends Writer {
104104
}
105105
}
106106

107+
/**
108+
* Delta tolerates async deletes; in other words when we delete a file, there is no strong
109+
* requirement that the file must be deleted immediately. Delta uses unique file names and never
110+
* re-writes a file that was previously deleted
111+
*/
112+
override def toleratesAsyncDelete: Boolean = true
113+
107114
}

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/HudiWriter.scala

+9
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,13 @@ class HudiWriter(config: Config.Hudi) extends Writer {
8585
.options(config.hudiWriteOptions)
8686
.save(config.location.toString)
8787
}
88+
89+
/**
90+
* Hudi cannot tolerate async deletes. When Hudi deletes a file, the file MUST be deleted
91+
* immediately.
92+
*
93+
* In particular, the `hoodie.properties` file gets deleted and re-created by Hudi, and those
94+
* steps must happen in order.
95+
*/
96+
override def toleratesAsyncDelete: Boolean = false
8897
}

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala

+6
Original file line numberDiff line numberDiff line change
@@ -107,4 +107,10 @@ class IcebergWriter(config: Config.Iceberg) extends Writer {
107107
}
108108
.mkString(", ")
109109

110+
/**
111+
* Iceberg tolerates async deletes; in other words when we delete a file, there is no strong
112+
* requirement that the file must be deleted immediately. Iceberg uses unique file names and never
113+
* re-writes a file that was previously deleted
114+
*/
115+
override def toleratesAsyncDelete: Boolean = true
110116
}

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/Writer.scala

+7
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,11 @@ trait Writer {
2929

3030
/** Write Snowplow events into the table */
3131
def write[F[_]: Sync](df: DataFrame): F[Unit]
32+
33+
/**
34+
* Whether this lake format tolerates deletes to happen asynchronously instead of immediately
35+
*
36+
* If tolerated, then we use our customized `LakeLoaderFileSystem`.
37+
*/
38+
def toleratesAsyncDelete: Boolean
3239
}

0 commit comments

Comments
 (0)