Skip to content

Commit

Permalink
update for common libraries to 0.1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 16, 2023
1 parent 798b621 commit 91ebf72
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 189 deletions.
5 changes: 3 additions & 2 deletions config/config.kinesis.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"schema": "atomic"
}

"bad": null # TODO need kinesis sink

"bad": {
"streamName": "bad"
}
}
}
36 changes: 31 additions & 5 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,30 @@
# -- name to use for the KCL dynamodb table
"appName": "snowplow-snowflake-loader"

# -- TODO explain the options
# -- From where the app should start consuming if this is the first time it is run.
# -- On subsequent runs, it will always resume from where it last checkpointed.
"initialPosition": {
# -- Options are `TRIM_HORIZON` for the oldest available events, `LATEST` for latest events,
# -- or `AT_TIMESTAMP` to start consuming from events written at a particular time.
"type": "TRIM_HORIZON"

# -- Only required if `initialPosition.type` is AT_TIMESTAMP
"timestamp": "2020-07-17T10:00:00Z" # Required for AT_TIMESTAMP
}

# -- TODO explain the options
# -- How the underlying Kinesis client should fetch events from the stream
"retrievalMode": {
# -- Options are "Polling" for the client to poll Kinesis for more events when needed
# -- or "FanOut" to enabled Kinesis's Enhanced Fan Out feature using HTTP/2
"type": "Polling"

# -- Only used if retrieval mode is type Polling. How many events the client may fetch in a single poll.
"maxRecords": 1000
}

# -- TODO explain what this is
"bufferSize": 3
# -- The number of batches of events which are pre-fetched from kinesis.
# -- Increasing this above 1 is not known to improve performance.
"bufferSize": 1

}

Expand Down Expand Up @@ -58,7 +69,22 @@
"jdbcQueryTimeout": "60 seconds"
}

"bad": null # TODO need kinesis sink for failed events
"bad": {
# -- output kinesis stream for emitting failed events that could not be processed
"streamName": "bad"

# -- how to retry sending failed events if we exceed the kinesis write throughput limits
"throttledBackoffPolicy": {
"minBackoff": "100 milliseconds"
"maxBackoff": "1 second"
}

# -- the maximum allowed to records we are allowed to send to Kinesis in 1 PutRecords request
"recordLimit": 500

# -- the maximum allowed to bytes we are allowed to send to Kinesis in 1 PutRecords request
"byteLimit": 5242880
}

}

Expand Down
6 changes: 2 additions & 4 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,10 @@
# -- output pubsub topic for emitting failed events that could not be processed
"topic": "projects/myproject/topics/snowplow-bad"

# -- bad events are held in memory until we accumulate this batch size, and then sent to pubsub
# -- Failed sends events to pubsub in batches not exceeding this size.
"batchSize": 100
# -- bad events are held in memory until we accumulate this total byte count, and then sent to pubsub
# -- Failed events to pubsub in batches not exceeding this size number of bytes
"requestByteThreshold": 1000000
# -- pending bad events are flushed to pubsub after this delay, regardless of whether we reached the max batch size or byte count
"delayThreshold": "100 millis"
}

}
Expand Down
12 changes: 2 additions & 10 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@

"monitoring": {
"metrics": {
"statsd": ${snowplow.defaults.statsd}
"statsd": {
"port": 8125,
"tags": {}
"period": "1 minute"
"prefix": "snowplow.snowflake-loader"
}
}
Expand All @@ -35,11 +33,5 @@
}
}

"telemetry": {
"disable": false
"interval": "15 minutes"
"collectorUri": "collector-g.snowplowanalytics.com"
"collectorPort": 443
"secure": true
}
"telemetry": ${snowplow.defaults.telemetry}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.util.Try

import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Telemetry}
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

case class Config[+Source, +Sink](
input: Source,
Expand Down Expand Up @@ -57,25 +58,6 @@ object Config {
statsd: Option[CommonMetrics.StatsdConfig]
)

private case class StatsdUnresolved(
hostname: Option[String],
port: Int,
tags: Map[String, String],
period: FiniteDuration,
prefix: String
)

private object Statsd {

def resolve(statsd: StatsdUnresolved): Option[CommonMetrics.StatsdConfig] =
statsd match {
case StatsdUnresolved(Some(hostname), port, tags, period, prefix) =>
Some(CommonMetrics.StatsdConfig(hostname, port, tags, period, prefix))
case StatsdUnresolved(None, _, _, _, _) =>
None
}
}

case class SentryM[M[_]](
dsn: M[String],
tags: Map[String, String]
Expand All @@ -96,22 +78,18 @@ object Config {
implicit val urlDecoder = Decoder.decodeString.emapTry { str =>
Try(new SnowflakeURL(str))
}
implicit val snowflake = deriveConfiguredDecoder[Snowflake]
implicit val output = deriveConfiguredDecoder[Output[Sink]]
implicit val batching = deriveConfiguredDecoder[Batching]
implicit val telemetry = deriveConfiguredDecoder[Telemetry.Config]
implicit val statsdDecoder = deriveConfiguredDecoder[StatsdUnresolved].map(Statsd.resolve(_))
implicit val snowflake = deriveConfiguredDecoder[Snowflake]
implicit val output = deriveConfiguredDecoder[Output[Sink]]
implicit val batching = deriveConfiguredDecoder[Batching]
implicit val telemetry = deriveConfiguredDecoder[Telemetry.Config]
implicit val sentryDecoder = deriveConfiguredDecoder[SentryM[Option]]
.map[Option[Sentry]] {
case SentryM(Some(dsn), tags) =>
Some(SentryM[Id](dsn, tags))
case SentryM(None, _) =>
None
}
implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val portDecoder = Decoder.decodeInt.emap { port =>
Port.fromInt(port).toRight("Invalid port")
}
implicit val metricsDecoder = deriveConfiguredDecoder[Metrics]
implicit val healthProbeDecoder = deriveConfiguredDecoder[HealthProbe]
implicit val monitoringDecoder = deriveConfiguredDecoder[Monitoring]
deriveConfiguredDecoder[Config[Source, Sink]]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,25 @@
package com.snowplowanalytics.snowplow.snowflake.processing

import cats.implicits._
import cats.{Applicative, Foldable, Monad, Semigroup}
import cats.{Applicative, Foldable, Monad}
import cats.effect.{Async, Sync}
import cats.effect.kernel.Unique
import fs2.{Pipe, Pull, Stream}
import fs2.{Chunk, Pipe, Stream}
import net.snowflake.ingest.utils.{ErrorCode, SFException}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.charset.StandardCharsets
import java.time.OffsetDateTime
import scala.concurrent.duration.Duration

import com.snowplowanalytics.iglu.schemaddl.parquet.Caster
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor}
import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload}
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.snowflake.{Config, Environment, Metrics}
import com.snowplowanalytics.snowplow.snowflake.{Environment, Metrics}
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.loaders.transform.Transform

object Processing {
Expand Down Expand Up @@ -69,7 +70,7 @@ object Processing {
origBatchBytes: Long,
badAccumulated: List[BadRow],
countInserted: Int,
tokens: List[Unique.Token]
tokens: Vector[Unique.Token]
)

/**
Expand Down Expand Up @@ -110,7 +111,7 @@ object Processing {
in.through(setLatency(env.metrics))
.through(parseBytes(badProcessor))
.through(transform(badProcessor))
.through(batchUp(env.batching))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.through(writeToSnowflake(env, badProcessor))
.through(sendFailedEvents(env))
.through(sendMetrics(env))
Expand All @@ -132,58 +133,55 @@ object Processing {
}

/** Parse raw bytes into Event using analytics sdk */
private def parseBytes[F[_]: Monad](badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, ParsedBatch] =
_.evalMap { case TokenedEvents(list, token, _) =>
Foldable[List].foldM(list, ParsedBatch(Nil, Nil, 0L, token)) { case (acc, bytes) =>
Applicative[F].pure {
val bytesSize = bytes.capacity
val stringified = StandardCharsets.UTF_8.decode(bytes).toString
Event.parse(stringified).toEither match {
case Right(e) =>
acc.copy(events = e :: acc.events, countBytes = acc.countBytes + bytesSize)
case Left(failure) =>
val payload = BadRowRawPayload(stringified)
val bad = BadRow.LoaderParsingError(badProcessor, failure, payload)
acc.copy(bad = bad :: acc.bad, countBytes = acc.countBytes + bytesSize)
}
}
}
private def parseBytes[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, TokenedEvents, ParsedBatch] =
_.evalMap { case TokenedEvents(chunk, token, _) =>
for {
numBytes <- Sync[F].delay(Foldable[Chunk].sumBytes(chunk))
(badRows, events) <- Foldable[Chunk].traverseSeparateUnordered(chunk) { bytes =>
Sync[F].delay {
val stringified = StandardCharsets.UTF_8.decode(bytes).toString
Event.parse(stringified).toEither.leftMap { case failure =>
val payload = BadRowRawPayload(stringified)
BadRow.LoaderParsingError(badProcessor, failure, payload)
}
}
}
} yield ParsedBatch(events, badRows, numBytes, token)
}

/** Transform the Event into values compatible with the snowflake ingest sdk */
private def transform[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, ParsedBatch, BatchAfterTransform] =
in =>
for {
ParsedBatch(events, bad, bytes, token) <- in
loadTstamp <- Stream.eval(Sync[F].realTimeInstant).map(SnowflakeCaster.timestampValue)
result <- Stream.eval(transformBatch[F](badProcessor, events, loadTstamp))
(moreBad, transformed) = result.separate
} yield BatchAfterTransform(
toBeInserted = transformed.toVector,
origBatchBytes = bytes,
badAccumulated = bad ::: moreBad,
countInserted = 0,
tokens = List(token)
)
_.evalMap { batch =>
Sync[F].realTimeInstant.flatMap { now =>
val loadTstamp = SnowflakeCaster.timestampValue(now)
transformBatch[F](badProcessor, loadTstamp, batch)
}
}

private def transformBatch[F[_]: Monad](
private def transformBatch[F[_]: Sync](
badProcessor: BadRowProcessor,
events: List[Event],
loadTstamp: OffsetDateTime
): F[List[Either[BadRow, (Event, Map[String, AnyRef])]]] =
events
.traverse { e =>
Applicative[F].pure {
loadTstamp: OffsetDateTime,
batch: ParsedBatch
): F[BatchAfterTransform] =
Foldable[List]
.traverseSeparateUnordered(batch.events) { event =>
Sync[F].delay {
Transform
.transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, e)
.transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, event)
.map { namedValues =>
val asMap = namedValues.map { case Caster.NamedValue(k, v) =>
k -> v
}.toMap
(e, asMap + ("load_tstamp" -> loadTstamp))
val map = namedValues
.map { case Caster.NamedValue(k, v) =>
k -> v
}
.toMap
.updated("load_tstamp", loadTstamp)
event -> map
}
}
}
.map { case (badRows, eventsWithTransforms) =>
BatchAfterTransform(eventsWithTransforms.toVector, batch.countBytes, badRows ::: batch.bad, 0, Vector(batch.token))
}

private def writeToSnowflake[F[_]: Async](
env: Environment[F],
Expand Down Expand Up @@ -342,57 +340,18 @@ object Processing {

private def fastGetByIndex[A](items: Vector[A], index: Long): A = items(index.toInt)

private implicit def batchedSemigroup: Semigroup[BatchAfterTransform] = new Semigroup[BatchAfterTransform] {
private implicit def batchable: BatchUp.Batchable[BatchAfterTransform] = new BatchUp.Batchable[BatchAfterTransform] {
def combine(x: BatchAfterTransform, y: BatchAfterTransform): BatchAfterTransform =
BatchAfterTransform(
toBeInserted = x.toBeInserted ++ y.toBeInserted,
origBatchBytes = x.origBatchBytes + y.origBatchBytes,
badAccumulated = x.badAccumulated ::: y.badAccumulated,
countInserted = x.countInserted + y.countInserted,
tokens = x.tokens ::: y.tokens
tokens = x.tokens ++ y.tokens
)
}

private def batchUp[F[_]: Async](config: Config.Batching): Pipe[F, BatchAfterTransform, BatchAfterTransform] = {
def go(
timedPull: Pull.Timed[F, BatchAfterTransform],
unflushed: Option[BatchAfterTransform]
): Pull[F, BatchAfterTransform, Unit] =
timedPull.uncons.flatMap {
case None => // Upstream stream has finished cleanly
unflushed match {
case None => Pull.done
case Some(b) => Pull.output1(b) *> Pull.done
}
case Some((Left(_), next)) => // The timer we set has timed out.
unflushed match {
case None => go(next, None)
case Some(b) => Pull.output1(b) >> go(next, None)
}
case Some((Right(pulled), next)) if pulled.isEmpty =>
go(next, unflushed)
case Some((Right(nonEmptyChunk), next)) => // Received another batch before the timer timed out
val combined = unflushed match {
case None => nonEmptyChunk.iterator.reduce(_ |+| _)
case Some(b) => nonEmptyChunk.iterator.foldLeft(b)(_ |+| _)
}
if (combined.origBatchBytes > config.maxBytes)
for {
_ <- Pull.output1(combined)
_ <- next.timeout(Duration.Zero)
_ <- go(next, None)
} yield ()
else {
for {
_ <- if (unflushed.isEmpty) next.timeout(config.maxDelay) else Pull.pure(())
_ <- go(next, Some(combined))
} yield ()
}
}
in =>
in.pull.timed { timedPull =>
go(timedPull, None)
}.stream
def weightOf(a: BatchAfterTransform): Long =
a.origBatchBytes
}

}
Loading

0 comments on commit 91ebf72

Please sign in to comment.