Skip to content

Commit

Permalink
Parallelize cpu-intensive processing
Browse files Browse the repository at this point in the history
This allows the loader to better utilize all cpu available on a larger
instance. Parallelism is configured by a new config parameter
`cpuParallelismFraction`. The actual parallelism is chosen dynamically
based on the number of available CPU, so the default value should be
appropriate for all sized VMs.
  • Loading branch information
istreeter committed Nov 16, 2024
1 parent e65e9d8 commit e4a3e07
Show file tree
Hide file tree
Showing 11 changed files with 71 additions and 44 deletions.
5 changes: 5 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@
"uploadConcurrency": 1
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
# -- E.g. If there are 4 available processors, and cpuParallelismFraction = 0.75, then we process 3 batches concurrently.
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"cpuParallelismFraction": 0.75

# Retry configuration for Snowflake operation failures
"retries": {

Expand Down
5 changes: 5 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@
"uploadConcurrency": 1
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
# -- E.g. If there are 4 available processors, and cpuParallelismFraction = 0.75, then we process 3 batches concurrently.
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"cpuParallelismFraction": 0.75

# Retry configuration for Snowflake operation failures
"retries": {

Expand Down
5 changes: 5 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@
"uploadConcurrency": 1
}

# -- Controls how the app splits the workload into concurrent batches which can be run in parallel.
# -- E.g. If there are 4 available processors, and cpuParallelismFraction = 0.75, then we process 3 batches concurrently.
# -- Adjusting this value can cause the app to use more or less of the available CPU.
"cpuParallelismFraction": 0.75

# Retry configuration for Snowflake operation failures
"retries": {

Expand Down
1 change: 1 addition & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"maxDelay": "1 second"
"uploadConcurrency": 3
}
"cpuParallelismFraction": 0.75

"retries": {
"setupErrors": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ case class Config[+Source, +Sink](
input: Source,
output: Config.Output[Sink],
batching: Config.Batching,
cpuParallelismFraction: BigDecimal,
retries: Config.Retries,
skipSchemas: List[SchemaCriterion],
telemetry: Telemetry.Config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,16 @@ import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManage
import com.snowplowanalytics.snowplow.sources.SourceAndAck
import org.http4s.client.Client

/**
* Resources and runtime-derived configuration needed for processing events
*
* @param cpuParallelism
* The processing Pipe involves several steps, some of which are cpu-intensive. We run
* cpu-intensive steps in parallel, so that on big instances we can take advantage of all cores.
* For each of those cpu-intensive steps, `cpuParallelism` controls the parallelism of that step.
*
* Other params are self-explanatory
*/
case class Environment[F[_]](
appInfo: AppInfo,
source: SourceAndAck[F],
Expand All @@ -29,6 +39,7 @@ case class Environment[F[_]](
metrics: Metrics[F],
appHealth: AppHealth.Interface[F, Alert, RuntimeService],
batching: Config.Batching,
cpuParallelism: Int,
schemasToSkip: List[SchemaCriterion],
badRowMaxSize: Int
)
Expand All @@ -54,17 +65,30 @@ object Environment {
tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries))
channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, appHealth)
channelProvider <- Channel.provider(channelOpener, config.retries, appHealth)
cpuParallelism = chooseCpuParallelism(config)
} yield Environment(
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tableManager = tableManager,
channel = channelProvider,
metrics = metrics,
appHealth = appHealth,
batching = config.batching,
schemasToSkip = config.skipSchemas,
badRowMaxSize = config.output.bad.maxRecordSize
appInfo = appInfo,
source = sourceAndAck,
badSink = badSink,
httpClient = httpClient,
tableManager = tableManager,
channel = channelProvider,
metrics = metrics,
appHealth = appHealth,
batching = config.batching,
cpuParallelism = cpuParallelism,
schemasToSkip = config.skipSchemas,
badRowMaxSize = config.output.bad.maxRecordSize
)

/**
* See the description of `cpuParallelism` on the [[Environment]] class
*
* For bigger instances (more cores) we want more parallelism, so that cpu-intensive steps can
* take advantage of all the cores.
*/
private def chooseCpuParallelism(config: Config[Any, Any]): Int =
(Runtime.getRuntime.availableProcessors * config.cpuParallelismFraction)
.setScale(0, BigDecimal.RoundingMode.UP)
.toInt
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ object Processing {

/** Model used between stages of the processing pipeline */

private case class ParsedBatch(
events: List[Event],
parseFailures: List[BadRow],
countBytes: Long,
countItems: Int,
token: Unique.Token
)

private case class TransformedBatch(
events: List[EventWithTransform],
parseFailures: List[BadRow],
Expand Down Expand Up @@ -128,8 +120,7 @@ object Processing {
val badProcessor = BadRowProcessor(env.appInfo.name, env.appInfo.version)

in.through(setLatency(env.metrics))
.through(parseBytes(badProcessor))
.through(transform(badProcessor, env.schemasToSkip))
.through(parseAndTransform(env, badProcessor))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.through(writeToSnowflake(env, badProcessor))
.through(sendFailedEvents(env, badProcessor))
Expand All @@ -152,8 +143,8 @@ object Processing {
}

/** Parse raw bytes into Event using analytics sdk */
private def parseBytes[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, ParsedBatch] =
_.evalMap { case TokenedEvents(chunk, token, _) =>
private def parseAndTransform[F[_]: Async](env: Environment[F], badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, TransformedBatch] =
_.parEvalMap(env.cpuParallelism) { case TokenedEvents(chunk, token, _) =>
for {
numBytes <- Sync[F].delay(Foldable[Chunk].sumBytes(chunk))
(badRows, events) <- Foldable[Chunk].traverseSeparateUnordered(chunk) { bytes =>
Expand All @@ -164,29 +155,20 @@ object Processing {
}
}
}
} yield ParsedBatch(events, badRows, numBytes, chunk.size, token)
}

/** Transform the Event into values compatible with the snowflake ingest sdk */
private def transform[F[_]: Sync](
badProcessor: BadRowProcessor,
schemasToSkip: List[SchemaCriterion]
): Pipe[F, ParsedBatch, TransformedBatch] =
_.evalMap { batch =>
Sync[F].realTimeInstant.flatMap { now =>
val loadTstamp = SnowflakeCaster.timestampValue(now)
transformBatch[F](badProcessor, loadTstamp, batch, schemasToSkip)
}
now <- Sync[F].realTimeInstant
loadTstamp = SnowflakeCaster.timestampValue(now)
(transformBad, transformed) <- transformBatch(badProcessor, loadTstamp, events, env.schemasToSkip)
} yield TransformedBatch(transformed, transformBad, badRows, numBytes, chunk.size, token)
}

private def transformBatch[F[_]: Sync](
badProcessor: BadRowProcessor,
loadTstamp: OffsetDateTime,
batch: ParsedBatch,
events: List[Event],
schemasToSkip: List[SchemaCriterion]
): F[TransformedBatch] =
): F[(List[BadRow], List[EventWithTransform])] =
Foldable[List]
.traverseSeparateUnordered(batch.events) { event =>
.traverseSeparateUnordered(events) { event =>
Sync[F].delay {
Transform
.transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, event, schemasToSkip)
Expand All @@ -201,9 +183,6 @@ object Processing {
}
}
}
.map { case (transformFailures, eventsWithTransforms) =>
TransformedBatch(eventsWithTransforms, batch.parseFailures, transformFailures, batch.countBytes, batch.countItems, batch.token)
}

private def writeToSnowflake[F[_]: Async](
env: Environment[F],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ object MockEnvironment {
maxDelay = 10.seconds,
uploadConcurrency = 1
),
schemasToSkip = List.empty,
badRowMaxSize = 1000000
cpuParallelism = 2,
schemasToSkip = List.empty,
badRowMaxSize = 1000000
)
MockEnvironment(state, env)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ object KafkaConfigSpec {
maxDelay = 1.second,
uploadConcurrency = 3
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
setupErrors = Retrying.Config.ForSetup(delay = 30.seconds),
transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5)
Expand Down Expand Up @@ -189,6 +190,7 @@ object KafkaConfigSpec {
maxDelay = 1.second,
uploadConcurrency = 1
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
setupErrors = Retrying.Config.ForSetup(delay = 30.seconds),
transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ object KinesisConfigSpec {
maxDelay = 1.second,
uploadConcurrency = 3
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
setupErrors = Retrying.Config.ForSetup(delay = 30.seconds),
transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5)
Expand Down Expand Up @@ -180,6 +181,7 @@ object KinesisConfigSpec {
maxDelay = 1.second,
uploadConcurrency = 1
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
setupErrors = Retrying.Config.ForSetup(delay = 30.seconds),
transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ object PubsubConfigSpec {
maxDelay = 1.second,
uploadConcurrency = 3
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
setupErrors = Retrying.Config.ForSetup(delay = 30.seconds),
transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5)
Expand Down Expand Up @@ -179,6 +180,7 @@ object PubsubConfigSpec {
maxDelay = 1.second,
uploadConcurrency = 1
),
cpuParallelismFraction = BigDecimal(0.75),
retries = Config.Retries(
setupErrors = Retrying.Config.ForSetup(delay = 30.seconds),
transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5)
Expand Down

0 comments on commit e4a3e07

Please sign in to comment.