diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index 7802eac..f59e8e9 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -79,6 +79,11 @@ "uploadConcurrency": 1 } + # -- Controls how the app splits the workload into concurrent batches which can be run in parallel. + # -- E.g. If there are 4 available processors, and cpuParallelismFraction = 0.75, then we process 3 batches concurrently. + # -- Adjusting this value can cause the app to use more or less of the available CPU. + "cpuParallelismFraction": 0.75 + # Retry configuration for Snowflake operation failures "retries": { diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 1b04523..dad6b49 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -96,6 +96,11 @@ "uploadConcurrency": 1 } + # -- Controls how the app splits the workload into concurrent batches which can be run in parallel. + # -- E.g. If there are 4 available processors, and cpuParallelismFraction = 0.75, then we process 3 batches concurrently. + # -- Adjusting this value can cause the app to use more or less of the available CPU. + "cpuParallelismFraction": 0.75 + # Retry configuration for Snowflake operation failures "retries": { diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index 0743a4d..21cab7f 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -85,6 +85,11 @@ "uploadConcurrency": 1 } + # -- Controls how the app splits the workload into concurrent batches which can be run in parallel. + # -- E.g. If there are 4 available processors, and cpuParallelismFraction = 0.75, then we process 3 batches concurrently. + # -- Adjusting this value can cause the app to use more or less of the available CPU. + "cpuParallelismFraction": 0.75 + # Retry configuration for Snowflake operation failures "retries": { diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index c8ed29a..8522fee 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -23,6 +23,7 @@ "maxDelay": "1 second" "uploadConcurrency": 3 } + "cpuParallelismFraction": 0.75 "retries": { "setupErrors": { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala index e09967b..5c12d76 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala @@ -29,6 +29,7 @@ case class Config[+Source, +Sink]( input: Source, output: Config.Output[Sink], batching: Config.Batching, + cpuParallelismFraction: BigDecimal, retries: Config.Retries, skipSchemas: List[SchemaCriterion], telemetry: Telemetry.Config, diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala index c49912a..43274e2 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Environment.scala @@ -19,6 +19,16 @@ import com.snowplowanalytics.snowplow.snowflake.processing.{Channel, TableManage import com.snowplowanalytics.snowplow.sources.SourceAndAck import org.http4s.client.Client +/** + * Resources and runtime-derived configuration needed for processing events + * + * @param cpuParallelism + * The processing Pipe involves several steps, some of which are cpu-intensive. We run + * cpu-intensive steps in parallel, so that on big instances we can take advantage of all cores. + * For each of those cpu-intensive steps, `cpuParallelism` controls the parallelism of that step. + * + * Other params are self-explanatory + */ case class Environment[F[_]]( appInfo: AppInfo, source: SourceAndAck[F], @@ -29,6 +39,7 @@ case class Environment[F[_]]( metrics: Metrics[F], appHealth: AppHealth.Interface[F, Alert, RuntimeService], batching: Config.Batching, + cpuParallelism: Int, schemasToSkip: List[SchemaCriterion], badRowMaxSize: Int ) @@ -54,17 +65,30 @@ object Environment { tableManager <- Resource.eval(TableManager.make(config.output.good, appHealth, config.retries)) channelOpener <- Channel.opener(config.output.good, config.batching, config.retries, appHealth) channelProvider <- Channel.provider(channelOpener, config.retries, appHealth) + cpuParallelism = chooseCpuParallelism(config) } yield Environment( - appInfo = appInfo, - source = sourceAndAck, - badSink = badSink, - httpClient = httpClient, - tableManager = tableManager, - channel = channelProvider, - metrics = metrics, - appHealth = appHealth, - batching = config.batching, - schemasToSkip = config.skipSchemas, - badRowMaxSize = config.output.bad.maxRecordSize + appInfo = appInfo, + source = sourceAndAck, + badSink = badSink, + httpClient = httpClient, + tableManager = tableManager, + channel = channelProvider, + metrics = metrics, + appHealth = appHealth, + batching = config.batching, + cpuParallelism = cpuParallelism, + schemasToSkip = config.skipSchemas, + badRowMaxSize = config.output.bad.maxRecordSize ) + + /** + * See the description of `cpuParallelism` on the [[Environment]] class + * + * For bigger instances (more cores) we want more parallelism, so that cpu-intensive steps can + * take advantage of all the cores. + */ + private def chooseCpuParallelism(config: Config[Any, Any]): Int = + (Runtime.getRuntime.availableProcessors * config.cpuParallelismFraction) + .setScale(0, BigDecimal.RoundingMode.UP) + .toInt } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index 58914f2..19b79c3 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -47,14 +47,6 @@ object Processing { /** Model used between stages of the processing pipeline */ - private case class ParsedBatch( - events: List[Event], - parseFailures: List[BadRow], - countBytes: Long, - countItems: Int, - token: Unique.Token - ) - private case class TransformedBatch( events: List[EventWithTransform], parseFailures: List[BadRow], @@ -128,8 +120,7 @@ object Processing { val badProcessor = BadRowProcessor(env.appInfo.name, env.appInfo.version) in.through(setLatency(env.metrics)) - .through(parseBytes(badProcessor)) - .through(transform(badProcessor, env.schemasToSkip)) + .through(parseAndTransform(env, badProcessor)) .through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay)) .through(writeToSnowflake(env, badProcessor)) .through(sendFailedEvents(env, badProcessor)) @@ -152,8 +143,8 @@ object Processing { } /** Parse raw bytes into Event using analytics sdk */ - private def parseBytes[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, ParsedBatch] = - _.evalMap { case TokenedEvents(chunk, token, _) => + private def parseAndTransform[F[_]: Async](env: Environment[F], badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, TransformedBatch] = + _.parEvalMap(env.cpuParallelism) { case TokenedEvents(chunk, token, _) => for { numBytes <- Sync[F].delay(Foldable[Chunk].sumBytes(chunk)) (badRows, events) <- Foldable[Chunk].traverseSeparateUnordered(chunk) { bytes => @@ -164,29 +155,20 @@ object Processing { } } } - } yield ParsedBatch(events, badRows, numBytes, chunk.size, token) - } - - /** Transform the Event into values compatible with the snowflake ingest sdk */ - private def transform[F[_]: Sync]( - badProcessor: BadRowProcessor, - schemasToSkip: List[SchemaCriterion] - ): Pipe[F, ParsedBatch, TransformedBatch] = - _.evalMap { batch => - Sync[F].realTimeInstant.flatMap { now => - val loadTstamp = SnowflakeCaster.timestampValue(now) - transformBatch[F](badProcessor, loadTstamp, batch, schemasToSkip) - } + now <- Sync[F].realTimeInstant + loadTstamp = SnowflakeCaster.timestampValue(now) + (transformBad, transformed) <- transformBatch(badProcessor, loadTstamp, events, env.schemasToSkip) + } yield TransformedBatch(transformed, transformBad, badRows, numBytes, chunk.size, token) } private def transformBatch[F[_]: Sync]( badProcessor: BadRowProcessor, loadTstamp: OffsetDateTime, - batch: ParsedBatch, + events: List[Event], schemasToSkip: List[SchemaCriterion] - ): F[TransformedBatch] = + ): F[(List[BadRow], List[EventWithTransform])] = Foldable[List] - .traverseSeparateUnordered(batch.events) { event => + .traverseSeparateUnordered(events) { event => Sync[F].delay { Transform .transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, event, schemasToSkip) @@ -201,9 +183,6 @@ object Processing { } } } - .map { case (transformFailures, eventsWithTransforms) => - TransformedBatch(eventsWithTransforms, batch.parseFailures, transformFailures, batch.countBytes, batch.countItems, batch.token) - } private def writeToSnowflake[F[_]: Async]( env: Environment[F], diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index 4efd0e0..3372866 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -64,8 +64,9 @@ object MockEnvironment { maxDelay = 10.seconds, uploadConcurrency = 1 ), - schemasToSkip = List.empty, - badRowMaxSize = 1000000 + cpuParallelism = 2, + schemasToSkip = List.empty, + badRowMaxSize = 1000000 ) MockEnvironment(state, env) } diff --git a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KafkaConfigSpec.scala b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KafkaConfigSpec.scala index c091d8c..ec2e0fc 100644 --- a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KafkaConfigSpec.scala +++ b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KafkaConfigSpec.scala @@ -109,6 +109,7 @@ object KafkaConfigSpec { maxDelay = 1.second, uploadConcurrency = 3 ), + cpuParallelismFraction = BigDecimal(0.75), retries = Config.Retries( setupErrors = Retrying.Config.ForSetup(delay = 30.seconds), transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5) @@ -189,6 +190,7 @@ object KafkaConfigSpec { maxDelay = 1.second, uploadConcurrency = 1 ), + cpuParallelismFraction = BigDecimal(0.75), retries = Config.Retries( setupErrors = Retrying.Config.ForSetup(delay = 30.seconds), transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5) diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala index 5b0285f..eba4086 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala @@ -105,6 +105,7 @@ object KinesisConfigSpec { maxDelay = 1.second, uploadConcurrency = 3 ), + cpuParallelismFraction = BigDecimal(0.75), retries = Config.Retries( setupErrors = Retrying.Config.ForSetup(delay = 30.seconds), transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5) @@ -180,6 +181,7 @@ object KinesisConfigSpec { maxDelay = 1.second, uploadConcurrency = 1 ), + cpuParallelismFraction = BigDecimal(0.75), retries = Config.Retries( setupErrors = Retrying.Config.ForSetup(delay = 30.seconds), transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5) diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala index a0dcb7e..dd0ec84 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala @@ -105,6 +105,7 @@ object PubsubConfigSpec { maxDelay = 1.second, uploadConcurrency = 3 ), + cpuParallelismFraction = BigDecimal(0.75), retries = Config.Retries( setupErrors = Retrying.Config.ForSetup(delay = 30.seconds), transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5) @@ -179,6 +180,7 @@ object PubsubConfigSpec { maxDelay = 1.second, uploadConcurrency = 1 ), + cpuParallelismFraction = BigDecimal(0.75), retries = Config.Retries( setupErrors = Retrying.Config.ForSetup(delay = 30.seconds), transientErrors = Retrying.Config.ForTransient(delay = 1.second, attempts = 5)