Skip to content

Commit 56146cb

Browse files
committed
Repartition by event name before writing to the lake
Previously, our Iceberg writer was using the [hash write distribution mode][1] because that is the default for Iceberg. In this mode, Spark repartitions by the dataframe immediately before writing to the lake. After this commit, we explicitly repartition the dataframe as part of the existing spark task for preparing the final dataframe. This means we can change the Iceberg write distribution mode to `none`. Overall this seems to improve the time taken to write a window of events to Iceberg. This fixes a problem we found, in which the write phase could get too slow when under high load (Iceberg only): specifically, a write was taking longer than the loader's "window" and this caused periods of low cpu usage, where the loader's processing phase was waiting for the write phase to catch up. This commit also removes the config option `writerParallelismFraction`. Before this commit, there were disadvantages to making the writer parallelism too high, because it would lead to smaller file sizes. But after this commit, now that we partition by event_name, we might as well make the writer parallelism as high as reasonably possible, which also speeds up the write phase of the loader. Note: this improvement will not help Snowplow users who have changed the parition key to something different to our default. We might want to make a follow-up change, in which it auto-discovers the lake's partition key. For example, some users might want to partition by `app_id` instead of `event_name`. [1]: https://iceberg.apache.org/docs/1.7.1/spark-writes/#writing-distribution-modes
1 parent 5ca2239 commit 56146cb

File tree

8 files changed

+38
-31
lines changed

8 files changed

+38
-31
lines changed

config/config.aws.reference.hocon

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,12 @@
127127
# "icebergTableProperties": {
128128
# "write.metadata.metrics.column.event_id": "count"
129129
# }
130+
#
131+
# # -- Any valid Iceberg write option
132+
# # -- This can be blank in most setups because the loader already sets sensible defaults.
133+
# "icebergWriteOptions": {
134+
# "write-format": "parquet"
135+
# }
130136
# }
131137

132138
"bad": {
@@ -181,11 +187,6 @@
181187
# -- E.g. to change credentials provider
182188
"fs.s3a.aws.credentials.provider": "com.amazonaws.auth.InstanceProfileCredentialsProvider"
183189
}
184-
185-
# -- Controls how many spark tasks run in parallel during writing the events to cloud storage.
186-
# -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing.
187-
# -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency.
188-
"writerParallelismFraction": 0.5
189190
}
190191

191192
# Retry configuration for lake operation failures

config/config.azure.reference.hocon

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@
9494
# "icebergTableProperties": {
9595
# "write.metadata.metrics.column.event_id": "count"
9696
# }
97+
#
98+
# # -- Any valid Iceberg write option
99+
# # -- This can be blank in most setups because the loader already sets sensible defaults.
100+
# "icebergWriteOptions": {
101+
# "write-format": "parquet"
102+
# }
97103
# }
98104

99105
"bad": {
@@ -145,11 +151,6 @@
145151
# -- E.g. to enable the spark ui for debugging:
146152
"spark.ui.enabled": true
147153
}
148-
149-
# -- Controls how many spark tasks run in parallel during writing the events to cloud storage.
150-
# -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing.
151-
# -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency.
152-
"writerParallelismFraction": 0.5
153154
}
154155

155156
# Retry configuration for lake operation failures

config/config.gcp.reference.hocon

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,12 @@
116116
# "icebergTableProperties": {
117117
# "write.metadata.metrics.column.event_id": "count"
118118
# }
119+
#
120+
# # -- Any valid Iceberg write option
121+
# # -- This can be blank in most setups because the loader already sets sensible defaults.
122+
# "icebergWriteOptions": {
123+
# "write-format": "parquet"
124+
# }
119125
# }
120126

121127
"bad": {
@@ -160,11 +166,6 @@
160166
# -- E.g. to enable the spark ui for debugging:
161167
"spark.ui.enabled": true
162168
}
163-
164-
# -- Controls how many spark tasks run in parallel during writing the events to cloud storage.
165-
# -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing.
166-
# -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency.
167-
"writerParallelismFraction": 0.5
168169
}
169170

170171
# Retry configuration for lake operation failures

modules/core/src/main/resources/reference.conf

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@
4141
"write.metadata.metrics.column.true_tstamp": "full"
4242
}
4343

44+
"icebergWriteOptions": {
45+
"merge-schema": "true"
46+
"check-ordering": "false"
47+
"distribution-mode": "none"
48+
}
49+
4450
"hudiTableProperties": {
4551
"hoodie.table.name": "events"
4652
"hoodie.table.keygenerator.class": "org.apache.hudi.keygen.TimestampBasedKeyGenerator"
@@ -121,9 +127,9 @@
121127
"spark.sql.parquet.datetimeRebaseModeInWrite": "CORRECTED"
122128
"spark.memory.storageFraction": "0"
123129
"spark.databricks.delta.autoCompact.enabled": "false"
130+
"spark.scheduler.mode": "FAIR"
124131
}
125132
"gcpUserAgent": ${gcpUserAgent}
126-
"writerParallelismFraction": 0.5
127133
}
128134

129135
"retries": {

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ object Config {
7474
table: String,
7575
catalog: IcebergCatalog,
7676
location: URI,
77-
icebergTableProperties: Map[String, String]
77+
icebergTableProperties: Map[String, String],
78+
icebergWriteOptions: Map[String, String]
7879
) extends Target
7980

8081
sealed trait IcebergCatalog
@@ -100,8 +101,7 @@ object Config {
100101
case class Spark(
101102
taskRetries: Int,
102103
conf: Map[String, String],
103-
gcpUserAgent: GcpUserAgent,
104-
writerParallelismFraction: BigDecimal
104+
gcpUserAgent: GcpUserAgent
105105
)
106106

107107
case class Metrics(

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ object LakeWriter {
8181
}
8282
for {
8383
session <- SparkUtils.session[F](config, w, target.location)
84-
writerParallelism = chooseWriterParallelism(config)
84+
writerParallelism = chooseWriterParallelism()
8585
mutex1 <- Resource.eval(Mutex[F])
8686
mutex2 <- Resource.eval(Mutex[F])
8787
} yield impl(session, w, writerParallelism, mutex1, mutex2)
@@ -175,14 +175,12 @@ object LakeWriter {
175175
}
176176

177177
/**
178-
* Converts `writerParallelismFraction` into a suggested number of threads
178+
* Allow spark to parallelize over _most_ of the available processors for writing to the lake,
179+
* because this speeds up how quickly we can sink a batch.
179180
*
180-
* For bigger instances (more cores) we want more parallelism in the writer. This avoids a
181-
* situation where writing tasks exceed the length of a window, which causes an unbalanced use of
182-
* cpu.
181+
* But leave 1 processor always available, so that we are never blocked when trying to save one of
182+
* the intermediate dataframes.
183183
*/
184-
private def chooseWriterParallelism(config: Config.Spark): Int =
185-
(Runtime.getRuntime.availableProcessors * config.writerParallelismFraction)
186-
.setScale(0, BigDecimal.RoundingMode.UP)
187-
.toInt
184+
private def chooseWriterParallelism(): Int =
185+
(Runtime.getRuntime.availableProcessors - 1).max(1)
188186
}

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import org.typelevel.log4cats.Logger
1818
import org.typelevel.log4cats.slf4j.Slf4jLogger
1919

2020
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
21-
import org.apache.spark.sql.functions.current_timestamp
21+
import org.apache.spark.sql.functions.{col, current_timestamp}
2222
import org.apache.spark.sql.types.StructType
2323

2424
import com.snowplowanalytics.snowplow.lakes.Config
@@ -101,6 +101,7 @@ private[processing] object SparkUtils {
101101
spark
102102
.table(viewName)
103103
.withColumn("load_tstamp", current_timestamp())
104+
.repartition(col("event_name"))
104105
.coalesce(writerParallelism)
105106
.localCheckpoint()
106107
}

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/tables/IcebergWriter.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,7 @@ class IcebergWriter(config: Config.Iceberg) extends Writer {
6161
df.write
6262
.format("iceberg")
6363
.mode("append")
64-
.option("merge-schema", true)
65-
.option("check-ordering", false)
64+
.options(config.icebergWriteOptions)
6665
.saveAsTable(fqTable)
6766
}
6867

0 commit comments

Comments
 (0)