diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 33d8526..37395bf 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -128,7 +128,7 @@ "spark.memory.storageFraction": "0" "spark.databricks.delta.autoCompact.enabled": "false" "spark.scheduler.mode": "FAIR" - "spark.sql.adaptive.enabled": "false" + "spark.sql.adaptive.enabled": "false" # False gives better performance on the type of shuffle done by Lake Loader } "gcpUserAgent": ${gcpUserAgent} } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala index aef04ae..3d63488 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala @@ -13,6 +13,7 @@ package com.snowplowanalytics.snowplow.lakes.processing import cats.implicits._ import cats.data.NonEmptyList import cats.effect.{Async, Resource, Sync} +import cats.effect.std.Mutex import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types.StructType @@ -80,7 +81,9 @@ object LakeWriter { } for { session <- SparkUtils.session[F](config, w, target.location) - } yield impl(session, w) + writerParallelism = chooseWriterParallelism() + mutex <- Resource.eval(Mutex[F]) + } yield impl(session, w, writerParallelism, mutex) } def withHandledErrors[F[_]: Async]( @@ -124,10 +127,15 @@ object LakeWriter { /** * Implementation of the LakeWriter + * + * The mutex is needed because we allow overlapping windows. They prevent two different windows + * from trying to run the same expensive operation at the same time. */ private def impl[F[_]: Sync]( spark: SparkSession, - w: Writer + w: Writer, + writerParallelism: Int, + mutex: Mutex[F] ): LakeWriter[F] = new LakeWriter[F] { def createTable: F[Unit] = w.prepareTable(spark) @@ -147,8 +155,21 @@ object LakeWriter { def commit(viewName: String): F[Unit] = for { - df <- SparkUtils.prepareFinalDataFrame(spark, viewName) - _ <- w.write(df) + df <- SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism) + _ <- mutex.lock + .surround { + w.write(df) + } } yield () } + + /** + * Allow spark to parallelize over _most_ of the available processors for writing to the lake, + * because this speeds up how quickly we can sink a batch. + * + * But leave 1 processor always available, so that we are never blocked when trying to save one of + * the intermediate dataframes. + */ + private def chooseWriterParallelism(): Int = + (Runtime.getRuntime.availableProcessors - 1).max(1) } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala index 29a971d..e362ba6 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala @@ -18,7 +18,7 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.functions.current_timestamp +import org.apache.spark.sql.functions.{col, current_timestamp} import org.apache.spark.sql.types.StructType import com.snowplowanalytics.snowplow.lakes.Config @@ -102,12 +102,14 @@ private[processing] object SparkUtils { def prepareFinalDataFrame[F[_]: Sync]( spark: SparkSession, - viewName: String + viewName: String, + writerParallelism: Int ): F[DataFrame] = Sync[F].delay { spark .table(viewName) - .sort("event_name") + .repartitionByRange(writerParallelism, col("event_name"), col("event_id")) + .sortWithinPartitions("event_name") .withColumn("load_tstamp", current_timestamp()) }