Skip to content

Commit

Permalink
With fair scheduler pools
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jan 7, 2025
1 parent fbd3218 commit 3e1534b
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 0 deletions.
13 changes: 13 additions & 0 deletions modules/core/src/main/resources/fairscheduler.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?xml version="1.0"?>
<allocations>
<pool name="pool1">
<schedulingMode>FIFO</schedulingMode>
<weight>1</weight>
<minShare>1</minShare>
</pool>
<pool name="pool2">
<schedulingMode>FIFO</schedulingMode>
<weight>1</weight>
<minShare>1</minShare>
</pool>
</allocations>
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ private[processing] object SparkUtils {
for {
_ <- Logger[F].debug(s"Initializing local DataFrame with name $viewName")
_ <- Sync[F].blocking {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.emptyDataFrame.createTempView(viewName)
}
} yield ()
Expand All @@ -84,6 +85,7 @@ private[processing] object SparkUtils {
for {
_ <- Logger[F].debug(s"Saving batch of ${rows.size} events to local DataFrame $viewName")
_ <- Sync[F].blocking {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark
.createDataFrame(rows.toList.asJava, schema)
.coalesce(1)
Expand All @@ -108,6 +110,7 @@ private[processing] object SparkUtils {
def dropView[F[_]: Sync](spark: SparkSession, viewName: String): F[Unit] =
Logger[F].info(s"Removing Spark data frame $viewName from local disk...") >>
Sync[F].blocking {
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
spark.catalog.dropTempView(viewName)
}.void

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class DeltaWriter(config: Config.Delta) extends Writer {
Sync[F].untilDefinedM {
Sync[F]
.blocking[Option[Unit]] {
df.sparkSession.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.write
.format("delta")
.mode("append")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class HudiWriter(config: Config.Hudi) extends Writer {

override def write[F[_]: Sync](df: DataFrame): F[Unit] =
Sync[F].blocking {
df.sparkSession.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.write
.format("hudi")
.mode("append")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class IcebergWriter(config: Config.Iceberg) extends Writer {

override def write[F[_]: Sync](df: DataFrame): F[Unit] =
Sync[F].blocking {
df.sparkSession.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.write
.format("iceberg")
.mode("append")
Expand Down

0 comments on commit 3e1534b

Please sign in to comment.