diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index c05033a..ba9d9e6 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -32,6 +32,17 @@ "maxRecords": 1000 } + # -- Name of this KCL worker used in the dynamodb lease table + "workerIdentifier": ${HOSTNAME} + + # -- Duration of shard leases. KCL workers must periodically refresh leases in the dynamodb table before this duration expires. + "leaseDuration": "10 seconds" + + # -- Controls how to pick the max number of leases to steal at one time. + # -- E.g. If there are 4 available processors, and maxLeasesToStealAtOneTimeFactor = 2.0, then allow the KCL to steal up to 8 leases. + # -- Allows bigger instances to more quickly acquire the shard-leases they need to combat latency + "maxLeasesToStealAtOneTimeFactor": 2.0 + } "output": { diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala index 5e14a73..672322e 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala @@ -61,15 +61,16 @@ class KinesisConfigSpec extends Specification with CatsEffect { object KinesisConfigSpec { private val minimalConfig = Config[KinesisSourceConfig, KinesisSinkConfig]( input = KinesisSourceConfig( - appName = "snowplow-snowflake-loader", - streamName = "snowplow-enriched-events", - workerIdentifier = "testWorkerId", - initialPosition = KinesisSourceConfig.InitialPosition.Latest, - retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - customEndpoint = None, - dynamodbCustomEndpoint = None, - cloudwatchCustomEndpoint = None, - leaseDuration = 10.seconds + appName = "snowplow-snowflake-loader", + streamName = "snowplow-enriched-events", + workerIdentifier = "testWorkerId", + initialPosition = KinesisSourceConfig.InitialPosition.Latest, + retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), + customEndpoint = None, + dynamodbCustomEndpoint = None, + cloudwatchCustomEndpoint = None, + leaseDuration = 10.seconds, + maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0) ), output = Config.Output( good = Config.Snowflake( @@ -138,15 +139,16 @@ object KinesisConfigSpec { */ private val extendedConfig = Config[KinesisSourceConfig, KinesisSinkConfig]( input = KinesisSourceConfig( - appName = "snowplow-snowflake-loader", - streamName = "snowplow-enriched-events", - workerIdentifier = "testWorkerId", - initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon, - retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), - customEndpoint = None, - dynamodbCustomEndpoint = None, - cloudwatchCustomEndpoint = None, - leaseDuration = 10.seconds + appName = "snowplow-snowflake-loader", + streamName = "snowplow-enriched-events", + workerIdentifier = "testWorkerId", + initialPosition = KinesisSourceConfig.InitialPosition.TrimHorizon, + retrievalMode = KinesisSourceConfig.Retrieval.Polling(1000), + customEndpoint = None, + dynamodbCustomEndpoint = None, + cloudwatchCustomEndpoint = None, + leaseDuration = 10.seconds, + maxLeasesToStealAtOneTimeFactor = BigDecimal(2.0) ), output = Config.Output( good = Config.Snowflake( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7b0905c..81ac8a5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -35,7 +35,7 @@ object Dependencies { val protobuf = "3.25.5" // Version override // Snowplow - val streams = "0.8.1" + val streams = "0.9.0-M1" // tests val specs2 = "4.20.0"