diff --git a/config/config.kinesis.minimal.hocon b/config/config.kinesis.minimal.hocon index f0969d3..eb8819f 100644 --- a/config/config.kinesis.minimal.hocon +++ b/config/config.kinesis.minimal.hocon @@ -13,7 +13,8 @@ "schema": "atomic" } - "bad": null # TODO need kinesis sink - + "bad": { + "streamName": "bad" + } } } diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index 24e995f..c97dbd1 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -6,19 +6,30 @@ # -- name to use for the KCL dynamodb table "appName": "snowplow-snowflake-loader" - # -- TODO explain the options + # -- From where the app should start consuming if this is the first time it is run. + # -- On subsequent runs, it will always resume from where it last checkpointed. "initialPosition": { + # -- Options are `TRIM_HORIZON` for the oldest available events, `LATEST` for latest events, + # -- or `AT_TIMESTAMP` to start consuming from events written at a particular time. "type": "TRIM_HORIZON" + + # -- Only required if `initialPosition.type` is AT_TIMESTAMP + "timestamp": "2020-07-17T10:00:00Z" # Required for AT_TIMESTAMP } - # -- TODO explain the options + # -- How the underlying Kinesis client should fetch events from the stream "retrievalMode": { + # -- Options are "Polling" for the client to poll Kinesis for more events when needed + # -- or "FanOut" to enabled Kinesis's Enhanced Fan Out feature using HTTP/2 "type": "Polling" + + # -- Only used if retrieval mode is type Polling. How many events the client may fetch in a single poll. "maxRecords": 1000 } - # -- TODO explain what this is - "bufferSize": 3 + # -- The number of batches of events which are pre-fetched from kinesis. + # -- Increasing this above 1 is not known to improve performance. + "bufferSize": 1 } @@ -58,7 +69,22 @@ "jdbcQueryTimeout": "60 seconds" } - "bad": null # TODO need kinesis sink for failed events + "bad": { + # -- output kinesis stream for emitting failed events that could not be processed + "streamName": "bad" + + # -- how to retry sending failed events if we exceed the kinesis write throughput limits + "throttledBackoffPolicy": { + "minBackoff": "100 milliseconds" + "maxBackoff": "1 second" + } + + # -- the maximum allowed to records we are allowed to send to Kinesis in 1 PutRecords request + "recordLimit": 500 + + # -- the maximum allowed to bytes we are allowed to send to Kinesis in 1 PutRecords request + "byteLimit": 5242880 + } } diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index e334d0a..4330e72 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -60,12 +60,10 @@ # -- output pubsub topic for emitting failed events that could not be processed "topic": "projects/myproject/topics/snowplow-bad" - # -- bad events are held in memory until we accumulate this batch size, and then sent to pubsub + # -- Failed sends events to pubsub in batches not exceeding this size. "batchSize": 100 - # -- bad events are held in memory until we accumulate this total byte count, and then sent to pubsub + # -- Failed events to pubsub in batches not exceeding this size number of bytes "requestByteThreshold": 1000000 - # -- pending bad events are flushed to pubsub after this delay, regardless of whether we reached the max batch size or byte count - "delayThreshold": "100 millis" } } diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index c8548e9..bbe2db8 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -18,10 +18,8 @@ "monitoring": { "metrics": { + "statsd": ${snowplow.defaults.statsd} "statsd": { - "port": 8125, - "tags": {} - "period": "1 minute" "prefix": "snowplow.snowflake-loader" } } @@ -35,11 +33,5 @@ } } - "telemetry": { - "disable": false - "interval": "15 minutes" - "collectorUri": "collector-g.snowplowanalytics.com" - "collectorPort": 443 - "secure": true - } + "telemetry": ${snowplow.defaults.telemetry} } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala index 7a26fc3..8cd82ae 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Config.scala @@ -19,6 +19,7 @@ import scala.concurrent.duration.FiniteDuration import scala.util.Try import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Telemetry} +import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._ case class Config[+Source, +Sink]( input: Source, @@ -57,25 +58,6 @@ object Config { statsd: Option[CommonMetrics.StatsdConfig] ) - private case class StatsdUnresolved( - hostname: Option[String], - port: Int, - tags: Map[String, String], - period: FiniteDuration, - prefix: String - ) - - private object Statsd { - - def resolve(statsd: StatsdUnresolved): Option[CommonMetrics.StatsdConfig] = - statsd match { - case StatsdUnresolved(Some(hostname), port, tags, period, prefix) => - Some(CommonMetrics.StatsdConfig(hostname, port, tags, period, prefix)) - case StatsdUnresolved(None, _, _, _, _) => - None - } - } - case class SentryM[M[_]]( dsn: M[String], tags: Map[String, String] @@ -100,7 +82,6 @@ object Config { implicit val output = deriveConfiguredDecoder[Output[Sink]] implicit val batching = deriveConfiguredDecoder[Batching] implicit val telemetry = deriveConfiguredDecoder[Telemetry.Config] - implicit val statsdDecoder = deriveConfiguredDecoder[StatsdUnresolved].map(Statsd.resolve(_)) implicit val sentryDecoder = deriveConfiguredDecoder[SentryM[Option]] .map[Option[Sentry]] { case SentryM(Some(dsn), tags) => @@ -109,9 +90,6 @@ object Config { None } implicit val metricsDecoder = deriveConfiguredDecoder[Metrics] - implicit val portDecoder = Decoder.decodeInt.emap { port => - Port.fromInt(port).toRight("Invalid port") - } implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe] implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring] deriveConfiguredDecoder[Config[Source, Sink]] diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/package.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/package.scala deleted file mode 100644 index 3b8a7ca..0000000 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/package.scala +++ /dev/null @@ -1,12 +0,0 @@ -/* - * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. - * - * This program is licensed to you under the Snowplow Community License Version 1.0, - * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. - * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 - */ -package com.snowplowanalytics.snowplow - -package object snowflake { - type AnyConfig = Config[Any, Any] -} diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index aa65f67..6e5dfc0 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -8,24 +8,25 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.implicits._ -import cats.{Applicative, Foldable, Monad, Semigroup} +import cats.{Applicative, Foldable, Monad} import cats.effect.{Async, Sync} import cats.effect.kernel.Unique -import fs2.{Pipe, Pull, Stream} +import fs2.{Chunk, Pipe, Stream} import net.snowflake.ingest.utils.{ErrorCode, SFException} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import java.nio.charset.StandardCharsets import java.time.OffsetDateTime -import scala.concurrent.duration.Duration import com.snowplowanalytics.iglu.schemaddl.parquet.Caster import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor} import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload} import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents} -import com.snowplowanalytics.snowplow.snowflake.{Config, Environment, Metrics} +import com.snowplowanalytics.snowplow.snowflake.{Environment, Metrics} +import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ +import com.snowplowanalytics.snowplow.runtime.processing.BatchUp import com.snowplowanalytics.snowplow.loaders.transform.Transform object Processing { @@ -69,7 +70,7 @@ object Processing { origBatchBytes: Long, badAccumulated: List[BadRow], countInserted: Int, - tokens: List[Unique.Token] + tokens: Vector[Unique.Token] ) /** @@ -110,7 +111,7 @@ object Processing { in.through(setLatency(env.metrics)) .through(parseBytes(badProcessor)) .through(transform(badProcessor)) - .through(batchUp(env.batching)) + .through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay)) .through(writeToSnowflake(env, badProcessor)) .through(sendFailedEvents(env)) .through(sendMetrics(env)) @@ -132,58 +133,55 @@ object Processing { } /** Parse raw bytes into Event using analytics sdk */ - private def parseBytes[F[_]: Monad](badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, ParsedBatch] = - _.evalMap { case TokenedEvents(list, token, _) => - Foldable[List].foldM(list, ParsedBatch(Nil, Nil, 0L, token)) { case (acc, bytes) => - Applicative[F].pure { - val bytesSize = bytes.capacity - val stringified = StandardCharsets.UTF_8.decode(bytes).toString - Event.parse(stringified).toEither match { - case Right(e) => - acc.copy(events = e :: acc.events, countBytes = acc.countBytes + bytesSize) - case Left(failure) => - val payload = BadRowRawPayload(stringified) - val bad = BadRow.LoaderParsingError(badProcessor, failure, payload) - acc.copy(bad = bad :: acc.bad, countBytes = acc.countBytes + bytesSize) - } - } - } + private def parseBytes[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, ParsedBatch] = + _.evalMap { case TokenedEvents(chunk, token, _) => + for { + numBytes <- Sync[F].delay(Foldable[Chunk].sumBytes(chunk)) + (badRows, events) <- Foldable[Chunk].traverseSeparateUnordered(chunk) { bytes => + Sync[F].delay { + val stringified = StandardCharsets.UTF_8.decode(bytes).toString + Event.parse(stringified).toEither.leftMap { case failure => + val payload = BadRowRawPayload(stringified) + BadRow.LoaderParsingError(badProcessor, failure, payload) + } + } + } + } yield ParsedBatch(events, badRows, numBytes, token) } /** Transform the Event into values compatible with the snowflake ingest sdk */ private def transform[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, ParsedBatch, BatchAfterTransform] = - in => - for { - ParsedBatch(events, bad, bytes, token) <- in - loadTstamp <- Stream.eval(Sync[F].realTimeInstant).map(SnowflakeCaster.timestampValue) - result <- Stream.eval(transformBatch[F](badProcessor, events, loadTstamp)) - (moreBad, transformed) = result.separate - } yield BatchAfterTransform( - toBeInserted = transformed.toVector, - origBatchBytes = bytes, - badAccumulated = bad ::: moreBad, - countInserted = 0, - tokens = List(token) - ) + _.evalMap { batch => + Sync[F].realTimeInstant.flatMap { now => + val loadTstamp = SnowflakeCaster.timestampValue(now) + transformBatch[F](badProcessor, loadTstamp, batch) + } + } - private def transformBatch[F[_]: Monad]( + private def transformBatch[F[_]: Sync]( badProcessor: BadRowProcessor, - events: List[Event], - loadTstamp: OffsetDateTime - ): F[List[Either[BadRow, (Event, Map[String, AnyRef])]]] = - events - .traverse { e => - Applicative[F].pure { + loadTstamp: OffsetDateTime, + batch: ParsedBatch + ): F[BatchAfterTransform] = + Foldable[List] + .traverseSeparateUnordered(batch.events) { event => + Sync[F].delay { Transform - .transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, e) + .transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, event) .map { namedValues => - val asMap = namedValues.map { case Caster.NamedValue(k, v) => - k -> v - }.toMap - (e, asMap + ("load_tstamp" -> loadTstamp)) + val map = namedValues + .map { case Caster.NamedValue(k, v) => + k -> v + } + .toMap + .updated("load_tstamp", loadTstamp) + event -> map } } } + .map { case (badRows, eventsWithTransforms) => + BatchAfterTransform(eventsWithTransforms.toVector, batch.countBytes, badRows ::: batch.bad, 0, Vector(batch.token)) + } private def writeToSnowflake[F[_]: Async]( env: Environment[F], @@ -342,57 +340,18 @@ object Processing { private def fastGetByIndex[A](items: Vector[A], index: Long): A = items(index.toInt) - private implicit def batchedSemigroup: Semigroup[BatchAfterTransform] = new Semigroup[BatchAfterTransform] { + private implicit def batchable: BatchUp.Batchable[BatchAfterTransform] = new BatchUp.Batchable[BatchAfterTransform] { def combine(x: BatchAfterTransform, y: BatchAfterTransform): BatchAfterTransform = BatchAfterTransform( toBeInserted = x.toBeInserted ++ y.toBeInserted, origBatchBytes = x.origBatchBytes + y.origBatchBytes, badAccumulated = x.badAccumulated ::: y.badAccumulated, countInserted = x.countInserted + y.countInserted, - tokens = x.tokens ::: y.tokens + tokens = x.tokens ++ y.tokens ) - } - private def batchUp[F[_]: Async](config: Config.Batching): Pipe[F, BatchAfterTransform, BatchAfterTransform] = { - def go( - timedPull: Pull.Timed[F, BatchAfterTransform], - unflushed: Option[BatchAfterTransform] - ): Pull[F, BatchAfterTransform, Unit] = - timedPull.uncons.flatMap { - case None => // Upstream stream has finished cleanly - unflushed match { - case None => Pull.done - case Some(b) => Pull.output1(b) *> Pull.done - } - case Some((Left(_), next)) => // The timer we set has timed out. - unflushed match { - case None => go(next, None) - case Some(b) => Pull.output1(b) >> go(next, None) - } - case Some((Right(pulled), next)) if pulled.isEmpty => - go(next, unflushed) - case Some((Right(nonEmptyChunk), next)) => // Received another batch before the timer timed out - val combined = unflushed match { - case None => nonEmptyChunk.iterator.reduce(_ |+| _) - case Some(b) => nonEmptyChunk.iterator.foldLeft(b)(_ |+| _) - } - if (combined.origBatchBytes > config.maxBytes) - for { - _ <- Pull.output1(combined) - _ <- next.timeout(Duration.Zero) - _ <- go(next, None) - } yield () - else { - for { - _ <- if (unflushed.isEmpty) next.timeout(config.maxDelay) else Pull.pure(()) - _ <- go(next, Some(combined)) - } yield () - } - } - in => - in.pull.timed { timedPull => - go(timedPull, None) - }.stream + def weightOf(a: BatchAfterTransform): Long = + a.origBatchBytes } } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala index 4ba7599..7a33847 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala @@ -8,7 +8,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.IO -import fs2.Stream +import fs2.{Chunk, Stream} import org.specs2.Specification import cats.effect.testing.specs2.CatsEffect import cats.effect.testkit.TestControl @@ -74,7 +74,7 @@ class ProcessingSpec extends Specification with CatsEffect { bads <- generateBadlyFormatted.take(3).compile.toList goods <- generateEvents.take(3).compile.toList inputs = bads.zip(goods).map { case (bad, good) => - TokenedEvents(bad.events ::: good.events, good.ack, None) + TokenedEvents(bad.events ++ good.events, good.ack, None) } control <- MockEnvironment.build(inputs) _ <- Processing.stream(control.environment).compile.drain @@ -232,7 +232,7 @@ object ProcessingSpec { } yield { val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0") val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0") - val serialized = List(event1, event2).map { e => + val serialized = Chunk(event1, event2).map { e => ByteBuffer.wrap(e.toTsv.getBytes(StandardCharsets.UTF_8)) } TokenedEvents(serialized, ack, None) @@ -242,7 +242,7 @@ object ProcessingSpec { def generateBadlyFormatted: Stream[IO, TokenedEvents] = Stream.eval { IO.unique.map { token => - val serialized = List("nonsense1", "nonsense2").map(s => ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) + val serialized = Chunk("nonsense1", "nonsense2").map(s => ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8))) TokenedEvents(serialized, token, None) } }.repeat diff --git a/modules/kafka/src/main/resources/application.conf b/modules/kafka/src/main/resources/application.conf index e672c19..394e111 100644 --- a/modules/kafka/src/main/resources/application.conf +++ b/modules/kafka/src/main/resources/application.conf @@ -1,13 +1,12 @@ { + "input": ${snowplow.defaults.sources.kafka} "input": { "consumerConf": { "group.id": "snowplow-snowflake-loader" - "enable.auto.commit": "false" - "allow.auto.create.topics": "false" - "auto.offset.reset": "earliest" } } "output": { + "bad": ${snowplow.defaults.sinks.kafka} "bad": { "producerConf": { "client.id": "snowplow-snowflake-loader" diff --git a/modules/kinesis/src/main/resources/application.conf b/modules/kinesis/src/main/resources/application.conf index 4c42450..614d5de 100644 --- a/modules/kinesis/src/main/resources/application.conf +++ b/modules/kinesis/src/main/resources/application.conf @@ -1,16 +1,9 @@ { + "input": ${snowplow.defaults.sources.kinesis} "input": { "appName": "snowplow-snowflake-loader" - "initialPosition": { - "type": "TRIM_HORIZON" - } - "retrievalMode": { - "type": "Polling" - "maxRecords": 1000 - } - "bufferSize": 3 } "output": { - "bad": null + "bad": ${snowplow.defaults.sinks.kinesis} } } diff --git a/modules/kinesis/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AwsApp.scala b/modules/kinesis/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AwsApp.scala index 8400393..1905c6d 100644 --- a/modules/kinesis/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AwsApp.scala +++ b/modules/kinesis/src/main/scala/com.snowplowanalytics.snowplow.snowflake/AwsApp.scala @@ -7,18 +7,12 @@ */ package com.snowplowanalytics.snowplow.snowflake -import cats.effect.{IO, Resource} - import com.snowplowanalytics.snowplow.sources.kinesis.{KinesisSource, KinesisSourceConfig} -import com.snowplowanalytics.snowplow.sinks.Sink +import com.snowplowanalytics.snowplow.sinks.kinesis.{KinesisSink, KinesisSinkConfig} -object GcpApp extends LoaderApp[KinesisSourceConfig, Unit](BuildInfo) { +object GcpApp extends LoaderApp[KinesisSourceConfig, KinesisSinkConfig](BuildInfo) { override def source: SourceProvider = KinesisSource.build(_) - override def badSink: SinkProvider = { _ => - Resource.pure { - Sink(_ => IO.unit) - } - } + override def badSink: SinkProvider = KinesisSink.resource(_) } diff --git a/modules/pubsub/src/main/resources/application.conf b/modules/pubsub/src/main/resources/application.conf index b1debf7..c9c3e39 100644 --- a/modules/pubsub/src/main/resources/application.conf +++ b/modules/pubsub/src/main/resources/application.conf @@ -1,16 +1,6 @@ { - "input": { - "parallelPullCount": 3 - "bufferMaxBytes": 1000000 - "maxAckExtensionPeriod": "1 hour" - "minDurationPerAckExtension": "60 seconds" - "maxDurationPerAckExtension": "600 seconds" - } + "input": ${snowplow.defaults.sources.pubsub} "output": { - "bad": { - "batchSize": 100 - "requestByteThreshold": 1000000 - "delayThreshold": "100 millis" - } + "bad": ${snowplow.defaults.sinks.pubsub} } } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 27b2c37..b1ca478 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,7 +28,7 @@ object Dependencies { val awsSdk2 = "2.20.135" // Snowplow - val streams = "0.1.0-M7" + val streams = "0.1.0" // tests val specs2 = "4.20.0"