-
Notifications
You must be signed in to change notification settings - Fork 80
Description
When using Spark with external resources like a database, a somehow common pattern is to make the database client shared between tasks so the connection pool is shared. Otherwise, with a large number of tasks/threads, the database connections are exhausted and will lead to issues when scaling.
This rises some complications when using such an object, as it must implement some kind of singleton shared between threads that receive serialized objects.
Any idea on how to do this with MacWire? Any pattern that can be used?
A simple example:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.must.Matchers.{be, convertToAnyMustWrapper}
class ModuleWithSparkSpec extends AnyFunSpec {
it("runs module with spark") {
val parallelism = 4
val module = new Module {
override lazy val connectionString: String = ""
override lazy val sparkConf: SparkConf = new SparkConf().setAppName("Test").setMaster(s"local[$parallelism]")
}
module.run(parallelism * 3) must be(parallelism * 3) // prints 4 thread ids and 4 different hash codes for 3 times
}
}
class Runner(val sparkConf: SparkConf, val database: Database) extends Serializable {
def run(count: Int): Long = {
val database = this.database
val sparkConf = this.sparkConf
SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
.sparkContext
.parallelize(0 until count)
.map { n => database.insert(n) }
.count()
}
}
trait Module extends Serializable {
def run(count: Int): Long = runner.run(count)
import com.softwaremill.macwire._
protected lazy val connectionString: String = ""
protected lazy val sparkConf: SparkConf = new SparkConf().setAppName("").setMaster("")
protected lazy val database: Database = wire[Database] // this will be serialized and duplicated 4 times
protected lazy val runner: Runner = wire[Runner]
}
class Database(connectionString: String) extends Serializable with AutoCloseable {
def insert(n: Int): Unit = {
println(s"Insert $n on thread id = ${Thread.currentThread().getId}, instance hash code = ${hashCode()}")
}
override def close(): Unit = {}
}
So the idea would be to have something instead of wire, or beside, that would make it use a single instance. I was thinking to implement a shared singleton Scope
that picks the instance from a concurrent collection, would this be the best way to do it?
protected lazy val database: Database = sharedSingleton(wire[Database])