@@ -18,7 +18,7 @@ import org.typelevel.log4cats.Logger
18
18
import org .typelevel .log4cats .slf4j .Slf4jLogger
19
19
20
20
import org .apache .spark .sql .{DataFrame , Row , SparkSession }
21
- import org .apache .spark .sql .functions .{ col , current_timestamp }
21
+ import org .apache .spark .sql .functions .current_timestamp
22
22
import org .apache .spark .sql .types .StructType
23
23
24
24
import com .snowplowanalytics .snowplow .lakes .Config
@@ -63,7 +63,8 @@ private[processing] object SparkUtils {
63
63
private def sparkConfigOptions (config : Config .Spark , writer : Writer ): Map [String , String ] = {
64
64
val gcpUserAgentKey = " fs.gs.storage.http.headers.user-agent"
65
65
val gcpUserAgentValue = s " ${config.gcpUserAgent.productName}/lake-loader (GPN:Snowplow;) "
66
- writer.sparkConfig ++ config.conf + (gcpUserAgentKey -> gcpUserAgentValue)
66
+ val shuffleKey = " spark.sql.shuffle.partitions"
67
+ writer.sparkConfig + (shuffleKey -> chooseWriterParallelism().show) ++ config.conf + (gcpUserAgentKey -> gcpUserAgentValue)
67
68
}
68
69
69
70
def initializeLocalDataFrame [F [_]: Sync ](spark : SparkSession , viewName : String ): F [Unit ] =
@@ -94,21 +95,31 @@ private[processing] object SparkUtils {
94
95
95
96
def prepareFinalDataFrame [F [_]: Sync ](
96
97
spark : SparkSession ,
97
- viewName : String ,
98
- writerParallelism : Int
98
+ viewName : String
99
99
): F [DataFrame ] =
100
- Sync [F ].blocking {
101
- spark
102
- .table(viewName)
103
- .withColumn(" load_tstamp" , current_timestamp())
104
- .repartition(col(" event_name" ))
105
- .coalesce(writerParallelism)
106
- .localCheckpoint()
107
- }
100
+ Logger [F ].debug(s " Analyzing final DataFrame $viewName" ) >>
101
+ Sync [F ].blocking {
102
+ val ret = spark
103
+ .table(viewName)
104
+ .withColumn(" load_tstamp" , current_timestamp())
105
+ .sort(" event_name" )
106
+ ret.queryExecution.assertAnalyzed()
107
+ ret
108
+ } <* Logger [F ].debug(s " Finished analyzing final DataFrame $viewName" )
108
109
109
110
def dropView [F [_]: Sync ](spark : SparkSession , viewName : String ): F [Unit ] =
110
111
Logger [F ].info(s " Removing Spark data frame $viewName from local disk... " ) >>
111
112
Sync [F ].blocking {
112
113
spark.catalog.dropTempView(viewName)
113
114
}.void
115
+
116
+ /**
117
+ * Allow spark to parallelize over _most_ of the available processors for writing to the lake,
118
+ * because this speeds up how quickly we can sink a batch.
119
+ *
120
+ * But leave 1 processor always available, so that we are never blocked when trying to save one of
121
+ * the intermediate dataframes.
122
+ */
123
+ private def chooseWriterParallelism (): Int =
124
+ (Runtime .getRuntime.availableProcessors - 1 ).max(1 )
114
125
}
0 commit comments