Skip to content

Commit

Permalink
Repartition by event name before writing to the lake
Browse files Browse the repository at this point in the history
Previously, our Iceberg writer was using the [hash write distribution mode][1]
because that is the default for Iceberg. In this mode, Spark
repartitions by the dataframe immediately before writing to the lake.

After this commit, we explicitly repartition the dataframe as part of
the existing spark task for preparing the final dataframe. This means we
can change the Iceberg write distribution mode to `none`.

Overall this seems to improve the time taken to write a window of events
to Iceberg. This fixes a problem we found, in which the write phase
could get too slow when under high load (Iceberg only): specifically, a
write was taking longer than the loader's "window" and this caused
periods of low cpu usage, where the loader's processing phase was
waiting for the write phase to catch up.

This commit also removes the config option `writerParallelismFraction`.
Before this commit, there were disadvantages to making the writer
parallelism too high, because it would lead to smaller file sizes. But
after this commit, now that we partition by event_name, we might as well
make the writer parallelism as high as reasonably possible, which also
speeds up the write phase of the loader.

Note: this improvement will not help Snowplow users who have changed the
parition key to something different to our default. We might want to
make a follow-up change, in which it auto-discovers the lake's partition
key. For example, some users might want to partition by `app_id` instead
of `event_name`.

[1]: https://iceberg.apache.org/docs/1.7.1/spark-writes/#writing-distribution-modes
  • Loading branch information
istreeter committed Jan 9, 2025
1 parent 8f31f53 commit b1eea59
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 103 deletions.
11 changes: 6 additions & 5 deletions config/config.aws.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@
# "icebergTableProperties": {
# "write.metadata.metrics.column.event_id": "count"
# }
#
# # -- Any valid Iceberg write option
# # -- This can be blank in most setups because the loader already sets sensible defaults.
# "icebergWriteOptions": {
# "write-format": "parquet"
# }
# }

"bad": {
Expand Down Expand Up @@ -181,11 +187,6 @@
# -- E.g. to change credentials provider
"fs.s3a.aws.credentials.provider": "com.amazonaws.auth.InstanceProfileCredentialsProvider"
}

# -- Controls how many spark tasks run in parallel during writing the events to cloud storage.
# -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing.
# -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency.
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
Expand Down
11 changes: 6 additions & 5 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@
# "icebergTableProperties": {
# "write.metadata.metrics.column.event_id": "count"
# }
#
# # -- Any valid Iceberg write option
# # -- This can be blank in most setups because the loader already sets sensible defaults.
# "icebergWriteOptions": {
# "write-format": "parquet"
# }
# }

"bad": {
Expand Down Expand Up @@ -145,11 +151,6 @@
# -- E.g. to enable the spark ui for debugging:
"spark.ui.enabled": true
}

# -- Controls how many spark tasks run in parallel during writing the events to cloud storage.
# -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing.
# -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency.
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
Expand Down
11 changes: 6 additions & 5 deletions config/config.gcp.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@
# "icebergTableProperties": {
# "write.metadata.metrics.column.event_id": "count"
# }
#
# # -- Any valid Iceberg write option
# # -- This can be blank in most setups because the loader already sets sensible defaults.
# "icebergWriteOptions": {
# "write-format": "parquet"
# }
# }

"bad": {
Expand Down Expand Up @@ -160,11 +166,6 @@
# -- E.g. to enable the spark ui for debugging:
"spark.ui.enabled": true
}

# -- Controls how many spark tasks run in parallel during writing the events to cloud storage.
# -- E.g. If there are 8 available processors, and cpuParallelismFraction = 0.5, then we have 4 spark tasks for writing.
# -- The default value is known to work well. Changing this setting might affect memory usage, file sizes, and/or latency.
"writerParallelismFraction": 0.5
}

# Retry configuration for lake operation failures
Expand Down
8 changes: 8 additions & 0 deletions modules/core/src/main/resources/fairscheduler.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?xml version="1.0"?>
<allocations>
<pool name="pool1">
<schedulingMode>FIFO</schedulingMode>
<weight>1000</weight>
<minShare>1</minShare>
</pool>
</allocations>
9 changes: 8 additions & 1 deletion modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@
"write.metadata.metrics.column.true_tstamp": "full"
}

"icebergWriteOptions": {
"merge-schema": "true"
"check-ordering": "false"
"distribution-mode": "none"
}

"hudiTableProperties": {
"hoodie.table.name": "events"
"hoodie.table.keygenerator.class": "org.apache.hudi.keygen.TimestampBasedKeyGenerator"
Expand Down Expand Up @@ -121,9 +127,10 @@
"spark.sql.parquet.datetimeRebaseModeInWrite": "CORRECTED"
"spark.memory.storageFraction": "0"
"spark.databricks.delta.autoCompact.enabled": "false"
"spark.scheduler.mode": "FAIR"
"spark.sql.adaptive.enabled": "false"
}
"gcpUserAgent": ${gcpUserAgent}
"writerParallelismFraction": 0.5
}

"retries": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ object Config {
table: String,
catalog: IcebergCatalog,
location: URI,
icebergTableProperties: Map[String, String]
icebergTableProperties: Map[String, String],
icebergWriteOptions: Map[String, String]
) extends Target

sealed trait IcebergCatalog
Expand All @@ -100,8 +101,7 @@ object Config {
case class Spark(
taskRetries: Int,
conf: Map[String, String],
gcpUserAgent: GcpUserAgent,
writerParallelismFraction: BigDecimal
gcpUserAgent: GcpUserAgent
)

case class Metrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package com.snowplowanalytics.snowplow.lakes.processing
import cats.implicits._
import cats.data.NonEmptyList
import cats.effect.{Async, Resource, Sync}
import cats.effect.std.Mutex
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -81,10 +80,7 @@ object LakeWriter {
}
for {
session <- SparkUtils.session[F](config, w, target.location)
writerParallelism = chooseWriterParallelism(config)
mutex1 <- Resource.eval(Mutex[F])
mutex2 <- Resource.eval(Mutex[F])
} yield impl(session, w, writerParallelism, mutex1, mutex2)
} yield impl(session, w)
}

def withHandledErrors[F[_]: Async](
Expand Down Expand Up @@ -128,23 +124,10 @@ object LakeWriter {

/**
* Implementation of the LakeWriter
*
* The mutexes are needed because we allow overlapping windows. They prevent two different windows
* from trying to run the same expensive operation at the same time.
*
* @param mutextForWriting
* Makes sure there is only ever one spark job trying to write events to the lake. This is a
* IO-intensive task.
* @param mutexForUnioning
* Makes sure there is only ever one spark job trying to union smaller DataFrames into a larger
* DataFrame, immediately before writing to the lake. This is a cpu-intensive task.
*/
private def impl[F[_]: Sync](
spark: SparkSession,
w: Writer,
writerParallelism: Int,
mutexForWriting: Mutex[F],
mutexForUnioning: Mutex[F]
w: Writer
): LakeWriter[F] = new LakeWriter[F] {
def createTable: F[Unit] =
w.prepareTable(spark)
Expand All @@ -164,25 +147,8 @@ object LakeWriter {

def commit(viewName: String): F[Unit] =
for {
df <- mutexForUnioning.lock.surround {
SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism)
}
_ <- mutexForWriting.lock
.surround {
w.write(df)
}
df <- SparkUtils.prepareFinalDataFrame(spark, viewName)
_ <- w.write(df)
} yield ()
}

/**
* Converts `writerParallelismFraction` into a suggested number of threads
*
* For bigger instances (more cores) we want more parallelism in the writer. This avoids a
* situation where writing tasks exceed the length of a window, which causes an unbalanced use of
* cpu.
*/
private def chooseWriterParallelism(config: Config.Spark): Int =
(Runtime.getRuntime.availableProcessors * config.writerParallelismFraction)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ object Processing {
earliestCollectorTstamp: Option[Instant]
)

private case class Transformed(
events: List[Row],
schema: StructType
)

private def eventProcessor[F[_]: Async: RegistryLookup](
env: Environment[F],
deferredTableExists: F[Unit]
Expand Down Expand Up @@ -126,13 +121,12 @@ object Processing {
.through(handleParseFailures(env, badProcessor))
.through(BatchUp.noTimeout(env.inMemBatchBytes))
.through(transformBatch(env, badProcessor, ref))
.through(sinkTransformedBatch(env, ref))

private def transformBatch[F[_]: RegistryLookup: Async](
env: Environment[F],
badProcessor: BadRowProcessor,
ref: Ref[F, WindowState]
): Pipe[F, Batched, Transformed] =
): Pipe[F, Batched, Nothing] =
_.parEvalMapUnordered(env.cpuParallelism) { case Batched(events, entities, _, earliestCollectorTstamp) =>
for {
_ <- Logger[F].debug(s"Processing batch of size ${events.size}")
Expand All @@ -141,30 +135,29 @@ object Processing {
_ <- rememberColumnNames(ref, nonAtomicFields.fields)
(bad, rows) <- transformToSpark[F](badProcessor, events, nonAtomicFields)
_ <- sendFailedEvents(env, badProcessor, bad)
_ <- ref.update { s =>
val updatedCollectorTstamp = chooseEarliestTstamp(earliestCollectorTstamp, s.earliestCollectorTstamp)
s.copy(numEvents = s.numEvents + rows.size, earliestCollectorTstamp = updatedCollectorTstamp)
}
} yield Transformed(rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectIgluNullability))
}
windowState <- ref.updateAndGet { s =>
val updatedCollectorTstamp = chooseEarliestTstamp(earliestCollectorTstamp, s.earliestCollectorTstamp)
s.copy(numEvents = s.numEvents + rows.size, earliestCollectorTstamp = updatedCollectorTstamp)
}
_ <- sinkTransformedBatch(env, windowState, rows, SparkSchema.forBatch(nonAtomicFields.fields, env.respectIgluNullability))
} yield ()
}.drain

private def sinkTransformedBatch[F[_]: Sync](
env: Environment[F],
ref: Ref[F, WindowState]
): Pipe[F, Transformed, Nothing] =
_.evalMap { case Transformed(rows, schema) =>
NonEmptyList.fromList(rows) match {
case Some(nel) =>
for {
windowState <- ref.get
_ <- env.lakeWriter.localAppendRows(windowState.viewName, nel, schema)
_ <- Logger[F].debug(s"Finished processing batch of size ${rows.size}")
} yield ()
case None =>
Logger[F].debug(s"An in-memory batch yielded zero good events. Nothing will be saved to local disk.")
}

}.drain
windowState: WindowState,
rows: List[Row],
schema: StructType
): F[Unit] =
NonEmptyList.fromList(rows) match {
case Some(nel) =>
for {
_ <- env.lakeWriter.localAppendRows(windowState.viewName, nel, schema)
_ <- Logger[F].debug(s"Finished processing batch of size ${rows.size}")
} yield ()
case None =>
Logger[F].debug(s"An in-memory batch yielded zero good events. Nothing will be saved to local disk.")
}

private def rememberTokens[F[_]: Functor](ref: Ref[F, WindowState]): Pipe[F, TokenedEvents, Chunk[ByteBuffer]] =
_.evalMap { case TokenedEvents(events, token) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ private[processing] object SparkUtils {
for {
_ <- Logger[F].debug(s"Initializing local DataFrame with name $viewName")
_ <- Sync[F].blocking {
spark.emptyDataFrame.createTempView(viewName)
try {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.emptyDataFrame.createTempView(viewName)
} finally
spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)
}
} yield ()

Expand All @@ -83,31 +87,37 @@ private[processing] object SparkUtils {
for {
_ <- Logger[F].debug(s"Saving batch of ${rows.size} events to local DataFrame $viewName")
_ <- Sync[F].blocking {
spark
.createDataFrame(rows.toList.asJava, schema)
.coalesce(1)
.localCheckpoint()
.unionByName(spark.table(viewName), allowMissingColumns = true)
.createOrReplaceTempView(viewName)
try {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark
.createDataFrame(rows.toList.asJava, schema)
.coalesce(1)
.localCheckpoint()
.unionByName(spark.table(viewName), allowMissingColumns = true)
.createOrReplaceTempView(viewName)
} finally
spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)
}
} yield ()

def prepareFinalDataFrame[F[_]: Sync](
spark: SparkSession,
viewName: String,
writerParallelism: Int
viewName: String
): F[DataFrame] =
Sync[F].blocking {
Sync[F].delay {
spark
.table(viewName)
.sort("event_name")
.withColumn("load_tstamp", current_timestamp())
.coalesce(writerParallelism)
.localCheckpoint()
}

def dropView[F[_]: Sync](spark: SparkSession, viewName: String): F[Unit] =
Logger[F].info(s"Removing Spark data frame $viewName from local disk...") >>
Sync[F].blocking {
spark.catalog.dropTempView(viewName)
try {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.catalog.dropTempView(viewName)
} finally
spark.sparkContext.setLocalProperty("spark.scheduler.pool", null)
}.void
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ class IcebergWriter(config: Config.Iceberg) extends Writer {
df.write
.format("iceberg")
.mode("append")
.option("merge-schema", true)
.option("check-ordering", false)
.options(config.icebergWriteOptions)
.saveAsTable(fqTable)
}

Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ object Dependencies {
object Spark {

// A version of Spark which is compatible with the current version of Iceberg and Delta
val forIcebergDelta = "3.5.3"
val forIcebergDelta = "3.5.4"
val forIcebergDeltaMinor = "3.5"

// Hudi can use a different version of Spark because we bundle a separate Docker image
// This version of Spark must be compatible with the current version of Hudi
val forHudi = "3.5.3"
val forHudi = "3.5.4"
val forHudiMinor = "3.5"
}

Expand All @@ -35,7 +35,7 @@ object Dependencies {
val delta = "3.2.1"
val hudi = "0.15.0"
val hudiAws = "1.0.0-beta2"
val iceberg = "1.6.1"
val iceberg = "1.7.1"
val hadoop = "3.4.1"
val gcsConnector = "hadoop3-2.2.25"
val hive = "3.1.3"
Expand Down

0 comments on commit b1eea59

Please sign in to comment.