Skip to content

Commit c462da4

Browse files
authored
Make better use of available cpu on larger VMs (#57)
These changes allow the loader to better utilize all cpu available on a larger instance. **1. CPU-intensive parsing/transforming is now parallelized**. Parallelism is configured by a new config parameter `cpuParallelismFraction`. The actual parallelism is chosen dynamically based on the number of available CPU, so the default value should be appropriate for all sized VMs. **2. We now open a new Snowflake ingest client per channel**. Note the Snowflake SDK recommends to re-use a single Client per VM and open multiple Channels on the same Client. So here we are going against the recommendations. But, we justify it because it gives the loader better visiblity of when the client's Future completes, signifying a complete write to Snowflake. **3. Upload parallelism chosen dynamically**. Larger VMs benefit from higher upload parallelism, in order to keep up with the faster rate of batches produced by the cpu-intensive tasks. Parallelsim is configured by a new parameter `uploadParallelismFactor`, which gets multiplied by the number of available CPU. The default value should be appropriate for all sized VMs. These new settings have been tested on pods ranging from 0.6 to 8 available CPU.
1 parent e65e9d8 commit c462da4

File tree

13 files changed

+201
-110
lines changed

13 files changed

+201
-110
lines changed

config/config.azure.reference.hocon

+12-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,9 @@
4242
# -- name to use for the events table.
4343
"table": "events"
4444

45-
# -- name to use for the snowflake channel.
45+
# -- Prefix to use for the snowflake channels.
46+
# -- The full name will be suffixed with a number, e.g. `snowplow-1`
47+
# -- The prefix must be unique per loader VM
4648
"channel": "snowplow"
4749

4850
# -- Timeouts used for JDBC operations
@@ -75,10 +77,17 @@
7577
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
7678
"maxDelay": "1 second"
7779

78-
# - How many batches can we send simultaneously over the network to Snowflake.
79-
"uploadConcurrency": 1
80+
# - Controls how many batches can we send simultaneously over the network to Snowflake.
81+
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
82+
# -- Adjusting this value can cause the app to use more or less of the available CPU.
83+
"uploadParallelismFactor": 2.5
8084
}
8185

86+
# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
87+
# -- E.g. If there are 4 available processors, and cpuParallelismFactor = 0.75, then we process 3 batches concurrently.
88+
# -- Adjusting this value can cause the app to use more or less of the available CPU.
89+
"cpuParallelismFactor": 0.75
90+
8291
# Retry configuration for Snowflake operation failures
8392
"retries": {
8493

config/config.kinesis.reference.hocon

+12-3
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@
5656
# -- name to use for the events table.
5757
"table": "events"
5858

59-
# -- name to use for the snowflake channel.
59+
# -- Prefix to use for the snowflake channels.
60+
# -- The full name will be suffixed with a number, e.g. `snowplow-1`
61+
# -- The prefix must be unique per loader VM
6062
"channel": "snowplow"
6163

6264
# -- Timeouts used for JDBC operations
@@ -92,10 +94,17 @@
9294
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
9395
"maxDelay": "1 second"
9496

95-
# - How many batches can we send simultaneously over the network to Snowflake.
96-
"uploadConcurrency": 1
97+
# - Controls how many batches can we send simultaneously over the network to Snowflake.
98+
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
99+
# -- Adjusting this value can cause the app to use more or less of the available CPU.
100+
"uploadParallelismFactor": 2.5
97101
}
98102

103+
# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
104+
# -- E.g. If there are 4 available processors, and cpuParallelismFactor = 0.75, then we process 3 batches concurrently.
105+
# -- Adjusting this value can cause the app to use more or less of the available CPU.
106+
"cpuParallelismFactor": 0.75
107+
99108
# Retry configuration for Snowflake operation failures
100109
"retries": {
101110

config/config.pubsub.reference.hocon

+12-3
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@
5252
# -- name to use for the events table.
5353
"table": "events"
5454

55-
# -- name to use for the snowflake channel.
55+
# -- Prefix to use for the snowflake channels.
56+
# -- The full name will be suffixed with a number, e.g. `snowplow-1`
57+
# -- The prefix must be unique per loader VM
5658
"channel": "snowplow"
5759

5860
# -- Timeouts used for JDBC operations
@@ -81,10 +83,17 @@
8183
# - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached
8284
"maxDelay": "1 second"
8385

84-
# - How many batches can we send simultaneously over the network to Snowflake.
85-
"uploadConcurrency": 1
86+
# - Controls how many batches can we send simultaneously over the network to Snowflake.
87+
# -- E.g. If there are 4 available processors, and uploadParallelismFactor = 2.5, then we send up to 10 batches in parallel
88+
# -- Adjusting this value can cause the app to use more or less of the available CPU.
89+
"uploadParallelismFactor": 2.5
8690
}
8791

92+
# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
93+
# -- E.g. If there are 4 available processors, and cpuParallelismFactor = 0.75, then we process 3 batches concurrently.
94+
# -- Adjusting this value can cause the app to use more or less of the available CPU.
95+
"cpuParallelismFactor": 0.75
96+
8897
# Retry configuration for Snowflake operation failures
8998
"retries": {
9099

modules/core/src/main/resources/reference.conf

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
"batching": {
2222
"maxBytes": 16000000
2323
"maxDelay": "1 second"
24-
"uploadConcurrency": 3
24+
"uploadParallelismFactor": 2.5
2525
}
26+
"cpuParallelismFactor": 0.75
2627

2728
"retries": {
2829
"setupErrors": {

modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ case class Config[+Source, +Sink](
2929
input: Source,
3030
output: Config.Output[Sink],
3131
batching: Config.Batching,
32+
cpuParallelismFactor: BigDecimal,
3233
retries: Config.Retries,
3334
skipSchemas: List[SchemaCriterion],
3435
telemetry: Telemetry.Config,
@@ -69,7 +70,7 @@ object Config {
6970
case class Batching(
7071
maxBytes: Long,
7172
maxDelay: FiniteDuration,
72-
uploadConcurrency: Int
73+
uploadParallelismFactor: BigDecimal
7374
)
7475

7576
case class Metrics(

modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala

+53-14
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,27 @@ import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManage
1919
import com.snowplowanalytics.snowplow.sources.SourceAndAck
2020
import org.http4s.client.Client
2121

22+
/**
23+
* Resources and runtime-derived configuration needed for processing events
24+
*
25+
* @param cpuParallelism
26+
* The processing Pipe involves several steps, some of which are cpu-intensive. We run
27+
* cpu-intensive steps in parallel, so that on big instances we can take advantage of all cores.
28+
* For each of those cpu-intensive steps, `cpuParallelism` controls the parallelism of that step.
29+
*
30+
* Other params are self-explanatory
31+
*/
2232
case class Environment[F[_]](
2333
appInfo: AppInfo,
2434
source: SourceAndAck[F],
2535
badSink: Sink[F],
2636
httpClient: Client[F],
2737
tableManager: TableManager[F],
28-
channel: Channel.Provider[F],
38+
channels: Vector[Channel.Provider[F]],
2939
metrics: Metrics[F],
3040
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
3141
batching: Config.Batching,
42+
cpuParallelism: Int,
3243
schemasToSkip: List[SchemaCriterion],
3344
badRowMaxSize: Int
3445
)
@@ -52,19 +63,47 @@ object Environment {
5263
badSink <- toSink(config.output.bad.sink).onError(_ => Resource.eval(appHealth.beUnhealthyForRuntimeService(RuntimeService.BadSink)))
5364
metrics <- Resource.eval(Metrics.build(config.monitoring.metrics))
5465
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries))
55-
channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, appHealth)
56-
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
66+
cpuParallelism = chooseCpuParallelism(config)
67+
uploadParallelism = chooseUploadParallelism(config)
68+
channelProviders <- Vector.range(0, uploadParallelism).traverse { index =>
69+
for {
70+
channelOpener <- Channel.opener(config.output.good, config.retries, appHealth, index)
71+
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
72+
} yield channelProvider
73+
}
5774
} yield Environment(
58-
appInfo = appInfo,
59-
source = sourceAndAck,
60-
badSink = badSink,
61-
httpClient = httpClient,
62-
tableManager = tableManager,
63-
channel = channelProvider,
64-
metrics = metrics,
65-
appHealth = appHealth,
66-
batching = config.batching,
67-
schemasToSkip = config.skipSchemas,
68-
badRowMaxSize = config.output.bad.maxRecordSize
75+
appInfo = appInfo,
76+
source = sourceAndAck,
77+
badSink = badSink,
78+
httpClient = httpClient,
79+
tableManager = tableManager,
80+
channels = channelProviders,
81+
metrics = metrics,
82+
appHealth = appHealth,
83+
batching = config.batching,
84+
cpuParallelism = cpuParallelism,
85+
schemasToSkip = config.skipSchemas,
86+
badRowMaxSize = config.output.bad.maxRecordSize
6987
)
88+
89+
/**
90+
* See the description of `cpuParallelism` on the [[Environment]] class
91+
*
92+
* For bigger instances (more cores) we want more parallelism, so that cpu-intensive steps can
93+
* take advantage of all the cores.
94+
*/
95+
private def chooseCpuParallelism(config: Config[Any, Any]): Int =
96+
multiplyByCpuAndRoundUp(config.cpuParallelismFactor)
97+
98+
/**
99+
* For bigger instances (more cores) we produce batches more quickly, and so need higher upload
100+
* parallelism so that uploading does not become bottleneck
101+
*/
102+
private def chooseUploadParallelism(config: Config[Any, Any]): Int =
103+
multiplyByCpuAndRoundUp(config.batching.uploadParallelismFactor)
104+
105+
private def multiplyByCpuAndRoundUp(factor: BigDecimal): Int =
106+
(Runtime.getRuntime.availableProcessors * factor)
107+
.setScale(0, BigDecimal.RoundingMode.UP)
108+
.toInt
70109
}

modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/LoaderApp.scala

+8
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,12 @@
1111
package com.snowplowanalytics.snowplow.snowflake
1212

1313
import cats.effect.{ExitCode, IO, Resource}
14+
import cats.effect.metrics.CpuStarvationWarningMetrics
1415
import io.circe.Decoder
1516
import com.monovore.decline.effect.CommandIOApp
1617
import com.monovore.decline.Opts
18+
import org.typelevel.log4cats.Logger
19+
import org.typelevel.log4cats.slf4j.Slf4jLogger
1720

1821
import scala.concurrent.duration.DurationInt
1922

@@ -28,6 +31,11 @@ abstract class LoaderApp[SourceConfig: Decoder, SinkConfig: Decoder](
2831
override def runtimeConfig =
2932
super.runtimeConfig.copy(cpuStarvationCheckInterval = 10.seconds)
3033

34+
private implicit val logger: Logger[IO] = Slf4jLogger.getLogger[IO]
35+
36+
override def onCpuStarvationWarn(metrics: CpuStarvationWarningMetrics): IO[Unit] =
37+
Logger[IO].debug(s"Cats Effect measured responsiveness in excess of ${metrics.starvationInterval * metrics.starvationThreshold}")
38+
3139
type SinkProvider = SinkConfig => Resource[IO, Sink[IO]]
3240
type SourceProvider = SourceConfig => IO[SourceAndAck[IO]]
3341

modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala

+13-12
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,14 @@ object Channel {
102102

103103
def opener[F[_]: Async](
104104
config: Config.Snowflake,
105-
batchingConfig: Config.Batching,
106105
retriesConfig: Config.Retries,
107-
appHealth: AppHealth.Interface[F, Alert, RuntimeService]
106+
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
107+
index: Int
108108
): Resource[F, Opener[F]] =
109109
for {
110-
client <- createClient(config, batchingConfig, retriesConfig, appHealth)
110+
client <- createClient(config, retriesConfig, appHealth)
111111
} yield new Opener[F] {
112-
def open: F[CloseableChannel[F]] = createChannel[F](config, client).map(impl[F])
112+
def open: F[CloseableChannel[F]] = createChannel[F](config, client, index).map(impl[F])
113113
}
114114

115115
def provider[F[_]: Async](
@@ -177,23 +177,25 @@ object Channel {
177177

178178
private def createChannel[F[_]: Async](
179179
config: Config.Snowflake,
180-
client: SnowflakeStreamingIngestClient
180+
client: SnowflakeStreamingIngestClient,
181+
index: Int
181182
): F[SnowflakeStreamingIngestChannel] = {
183+
val channelName = s"${config.channel}-$index"
182184
val request = OpenChannelRequest
183-
.builder(config.channel)
185+
.builder(channelName)
184186
.setDBName(config.database)
185187
.setSchemaName(config.schema)
186188
.setTableName(config.table)
187189
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
188190
.setDefaultTimezone(ZoneOffset.UTC)
189191
.build
190192

191-
Logger[F].info(s"Opening channel ${config.channel}") *>
193+
Logger[F].info(s"Opening channel ${channelName}") *>
192194
Async[F].blocking(client.openChannel(request)) <*
193-
Logger[F].info(s"Successfully opened channel ${config.channel}")
195+
Logger[F].info(s"Successfully opened channel ${channelName}")
194196
}
195197

196-
private def channelProperties(config: Config.Snowflake, batchingConfig: Config.Batching): Properties = {
198+
private def channelProperties(config: Config.Snowflake): Properties = {
197199
val props = new Properties()
198200
props.setProperty("user", config.user)
199201
props.setProperty("private_key", config.privateKey)
@@ -211,14 +213,13 @@ object Channel {
211213
props.setProperty(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_PERCENTAGE, "0")
212214
props.setProperty(ParameterProvider.INSERT_THROTTLE_THRESHOLD_IN_BYTES, "0")
213215
props.setProperty(ParameterProvider.MAX_CHANNEL_SIZE_IN_BYTES, Long.MaxValue.toString)
214-
props.setProperty(ParameterProvider.IO_TIME_CPU_RATIO, batchingConfig.uploadConcurrency.toString)
216+
props.setProperty(ParameterProvider.IO_TIME_CPU_RATIO, "0")
215217

216218
props
217219
}
218220

219221
private def createClient[F[_]: Async](
220222
config: Config.Snowflake,
221-
batchingConfig: Config.Batching,
222223
retriesConfig: Config.Retries,
223224
appHealth: AppHealth.Interface[F, Alert, RuntimeService]
224225
): Resource[F, SnowflakeStreamingIngestClient] = {
@@ -228,7 +229,7 @@ object Channel {
228229
Sync[F].blocking {
229230
SnowflakeStreamingIngestClientFactory
230231
.builder("Snowplow_Streaming")
231-
.setProperties(channelProperties(config, batchingConfig))
232+
.setProperties(channelProperties(config))
232233
// .setParameterOverrides(Map.empty.asJava) // Not needed, as all params can also be set with Properties
233234
.build
234235
} <*

0 commit comments

Comments
 (0)