Skip to content

Commit 2f2482f

Browse files
committed
Sort dataframe and disable localcheckpoint of final dataframe
1 parent 56146cb commit 2f2482f

File tree

4 files changed

+30
-39
lines changed

4 files changed

+30
-39
lines changed

modules/core/src/main/resources/reference.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@
128128
"spark.memory.storageFraction": "0"
129129
"spark.databricks.delta.autoCompact.enabled": "false"
130130
"spark.scheduler.mode": "FAIR"
131+
"spark.sql.adaptive.enabled": "false"
131132
}
132133
"gcpUserAgent": ${gcpUserAgent}
133134
}

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/LakeWriter.scala

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,8 @@ object LakeWriter {
8181
}
8282
for {
8383
session <- SparkUtils.session[F](config, w, target.location)
84-
writerParallelism = chooseWriterParallelism()
85-
mutex1 <- Resource.eval(Mutex[F])
86-
mutex2 <- Resource.eval(Mutex[F])
87-
} yield impl(session, w, writerParallelism, mutex1, mutex2)
84+
mutex <- Resource.eval(Mutex[F])
85+
} yield impl(session, w, mutex)
8886
}
8987

9088
def withHandledErrors[F[_]: Async](
@@ -129,22 +127,17 @@ object LakeWriter {
129127
/**
130128
* Implementation of the LakeWriter
131129
*
132-
* The mutexes are needed because we allow overlapping windows. They prevent two different windows
130+
* The mutex is needed because we allow overlapping windows. They prevent two different windows
133131
* from trying to run the same expensive operation at the same time.
134132
*
135133
* @param mutextForWriting
136134
* Makes sure there is only ever one spark job trying to write events to the lake. This is a
137135
* IO-intensive task.
138-
* @param mutexForUnioning
139-
* Makes sure there is only ever one spark job trying to union smaller DataFrames into a larger
140-
* DataFrame, immediately before writing to the lake. This is a cpu-intensive task.
141136
*/
142137
private def impl[F[_]: Sync](
143138
spark: SparkSession,
144139
w: Writer,
145-
writerParallelism: Int,
146-
mutexForWriting: Mutex[F],
147-
mutexForUnioning: Mutex[F]
140+
mutexForWriting: Mutex[F]
148141
): LakeWriter[F] = new LakeWriter[F] {
149142
def createTable: F[Unit] =
150143
w.prepareTable(spark)
@@ -164,23 +157,11 @@ object LakeWriter {
164157

165158
def commit(viewName: String): F[Unit] =
166159
for {
167-
df <- mutexForUnioning.lock.surround {
168-
SparkUtils.prepareFinalDataFrame(spark, viewName, writerParallelism)
169-
}
160+
df <- SparkUtils.prepareFinalDataFrame(spark, viewName)
170161
_ <- mutexForWriting.lock
171162
.surround {
172163
w.write(df)
173164
}
174165
} yield ()
175166
}
176-
177-
/**
178-
* Allow spark to parallelize over _most_ of the available processors for writing to the lake,
179-
* because this speeds up how quickly we can sink a batch.
180-
*
181-
* But leave 1 processor always available, so that we are never blocked when trying to save one of
182-
* the intermediate dataframes.
183-
*/
184-
private def chooseWriterParallelism(): Int =
185-
(Runtime.getRuntime.availableProcessors - 1).max(1)
186167
}

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import org.typelevel.log4cats.Logger
1818
import org.typelevel.log4cats.slf4j.Slf4jLogger
1919

2020
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
21-
import org.apache.spark.sql.functions.{col, current_timestamp}
21+
import org.apache.spark.sql.functions.current_timestamp
2222
import org.apache.spark.sql.types.StructType
2323

2424
import com.snowplowanalytics.snowplow.lakes.Config
@@ -63,7 +63,8 @@ private[processing] object SparkUtils {
6363
private def sparkConfigOptions(config: Config.Spark, writer: Writer): Map[String, String] = {
6464
val gcpUserAgentKey = "fs.gs.storage.http.headers.user-agent"
6565
val gcpUserAgentValue = s"${config.gcpUserAgent.productName}/lake-loader (GPN:Snowplow;)"
66-
writer.sparkConfig ++ config.conf + (gcpUserAgentKey -> gcpUserAgentValue)
66+
val shuffleKey = "spark.sql.shuffle.partitions"
67+
writer.sparkConfig + (shuffleKey -> chooseWriterParallelism().show) ++ config.conf + (gcpUserAgentKey -> gcpUserAgentValue)
6768
}
6869

6970
def initializeLocalDataFrame[F[_]: Sync](spark: SparkSession, viewName: String): F[Unit] =
@@ -94,21 +95,29 @@ private[processing] object SparkUtils {
9495

9596
def prepareFinalDataFrame[F[_]: Sync](
9697
spark: SparkSession,
97-
viewName: String,
98-
writerParallelism: Int
98+
viewName: String
9999
): F[DataFrame] =
100-
Sync[F].blocking {
101-
spark
102-
.table(viewName)
103-
.withColumn("load_tstamp", current_timestamp())
104-
.repartition(col("event_name"))
105-
.coalesce(writerParallelism)
106-
.localCheckpoint()
107-
}
100+
Logger[F].debug(s"Analyzing final DataFrame $viewName") >>
101+
Sync[F].delay {
102+
spark
103+
.table(viewName)
104+
.withColumn("load_tstamp", current_timestamp())
105+
.sort("event_name")
106+
} <* Logger[F].debug(s"Finished analyzing final DataFrame $viewName")
108107

109108
def dropView[F[_]: Sync](spark: SparkSession, viewName: String): F[Unit] =
110109
Logger[F].info(s"Removing Spark data frame $viewName from local disk...") >>
111110
Sync[F].blocking {
112111
spark.catalog.dropTempView(viewName)
113112
}.void
113+
114+
/**
115+
* Allow spark to parallelize over _most_ of the available processors for writing to the lake,
116+
* because this speeds up how quickly we can sink a batch.
117+
*
118+
* But leave 1 processor always available, so that we are never blocked when trying to save one of
119+
* the intermediate dataframes.
120+
*/
121+
private def chooseWriterParallelism(): Int =
122+
(Runtime.getRuntime.availableProcessors - 1).max(1)
114123
}

project/Dependencies.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ object Dependencies {
1515
object Spark {
1616

1717
// A version of Spark which is compatible with the current version of Iceberg and Delta
18-
val forIcebergDelta = "3.5.3"
18+
val forIcebergDelta = "3.5.4"
1919
val forIcebergDeltaMinor = "3.5"
2020

2121
// Hudi can use a different version of Spark because we bundle a separate Docker image
2222
// This version of Spark must be compatible with the current version of Hudi
23-
val forHudi = "3.5.3"
23+
val forHudi = "3.5.4"
2424
val forHudiMinor = "3.5"
2525
}
2626

@@ -35,7 +35,7 @@ object Dependencies {
3535
val delta = "3.2.1"
3636
val hudi = "0.15.0"
3737
val hudiAws = "1.0.0-beta2"
38-
val iceberg = "1.6.1"
38+
val iceberg = "1.7.1"
3939
val hadoop = "3.4.1"
4040
val gcsConnector = "hadoop3-2.2.25"
4141
val hive = "3.1.3"

0 commit comments

Comments
 (0)