Skip to content

Commit

Permalink
event_id as salt
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jan 9, 2025
1 parent b1eea59 commit 285ac5f
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
2 changes: 1 addition & 1 deletion modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"spark.memory.storageFraction": "0"
"spark.databricks.delta.autoCompact.enabled": "false"
"spark.scheduler.mode": "FAIR"
"spark.sql.adaptive.enabled": "false"
"spark.sql.adaptive.enabled": "false" # False gives better performance on the type of shuffle done by Lake Loader
}
"gcpUserAgent": ${gcpUserAgent}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package com.snowplowanalytics.snowplow.lakes.processing
import cats.implicits._
import cats.data.NonEmptyList
import cats.effect.{Async, Resource, Sync}
import cats.effect.std.Mutex
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -80,7 +81,9 @@ object LakeWriter {
}
for {
session <- SparkUtils.session[F](config, w, target.location)
} yield impl(session, w)
writerParallelism = chooseWriterParallelism()
mutex <- Resource.eval(Mutex[F])
} yield impl(session, w, writerParallelism, mutex)
}

def withHandledErrors[F[_]: Async](
Expand Down Expand Up @@ -124,10 +127,15 @@ object LakeWriter {

/**
* Implementation of the LakeWriter
*
* The mutex is needed because we allow overlapping windows. They prevent two different windows
* from trying to run the same expensive operation at the same time.
*/
private def impl[F[_]: Sync](
spark: SparkSession,
w: Writer
w: Writer,
writerParallelism: Int,
mutex: Mutex[F]
): LakeWriter[F] = new LakeWriter[F] {
def createTable: F[Unit] =
w.prepareTable(spark)
Expand All @@ -147,8 +155,21 @@ object LakeWriter {

def commit(viewName: String): F[Unit] =
for {
df <- SparkUtils.prepareFinalDataFrame(spark, viewName)
_ <- w.write(df)
df <- SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism)
_ <- mutex.lock
.surround {
w.write(df)
}
} yield ()
}

/**
* Allow spark to parallelize over _most_ of the available processors for writing to the lake,
* because this speeds up how quickly we can sink a batch.
*
* But leave 1 processor always available, so that we are never blocked when trying to save one of
* the intermediate dataframes.
*/
private def chooseWriterParallelism(): Int =
(Runtime.getRuntime.availableProcessors - 1).max(1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.functions.{col, current_timestamp}
import org.apache.spark.sql.types.StructType

import com.snowplowanalytics.snowplow.lakes.Config
Expand Down Expand Up @@ -102,12 +102,14 @@ private[processing] object SparkUtils {

def prepareFinalDataFrame[F[_]: Sync](
spark: SparkSession,
viewName: String
viewName: String,
writerParallelism: Int
): F[DataFrame] =
Sync[F].delay {
spark
.table(viewName)
.sort("event_name")
.repartitionByRange(writerParallelism, col("event_name"), col("event_id"))
.sortWithinPartitions("event_name")
.withColumn("load_tstamp", current_timestamp())
}

Expand Down

0 comments on commit 285ac5f

Please sign in to comment.