Skip to content

Commit

Permalink
Add skipping schemas feature
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Nov 28, 2023
1 parent a1dc022 commit 1347ead
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 13 deletions.
8 changes: 8 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@
"uploadConcurrency": 1
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0",
"iglu:com.acme/skipped2/jsonschema/1-0-*",
"iglu:com.acme/skipped3/jsonschema/1-*-*",
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

"monitoring": {
"metrics": {

Expand Down
8 changes: 8 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,14 @@
"uploadConcurrency": 1
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0",
"iglu:com.acme/skipped2/jsonschema/1-0-*",
"iglu:com.acme/skipped3/jsonschema/1-*-*",
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

"monitoring": {
"metrics": {

Expand Down
8 changes: 8 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@
"uploadConcurrency": 1
}

# -- Schemas that won't be loaded to Snowflake. Optional, default value []
"skipSchemas": [
"iglu:com.acme/skipped1/jsonschema/1-0-0",
"iglu:com.acme/skipped2/jsonschema/1-0-*",
"iglu:com.acme/skipped3/jsonschema/1-*-*",
"iglu:com.acme/skipped4/jsonschema/*-*-*"
]

"monitoring": {
"metrics": {

Expand Down
2 changes: 2 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
"maxDelay": "1 second"
"uploadConcurrency": 3
}

"skipSchemas": []

"monitoring": {
"metrics": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,19 @@ import io.circe.generic.extras.Configuration
import io.circe.config.syntax._
import net.snowflake.ingest.utils.SnowflakeURL
import com.comcast.ip4s.Port
import com.snowplowanalytics.iglu.core.SchemaCriterion
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs.schemaCriterionDecoder

import scala.concurrent.duration.FiniteDuration
import scala.util.Try

import com.snowplowanalytics.snowplow.runtime.{Metrics => CommonMetrics, Telemetry}
import com.snowplowanalytics.snowplow.runtime.HealthProbe.decoders._

case class Config[+Source, +Sink](
input: Source,
output: Config.Output[Sink],
batching: Config.Batching,
skipSchemas: List[SchemaCriterion],
telemetry: Telemetry.Config,
monitoring: Config.Monitoring
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import cats.implicits._
import cats.Functor
import cats.effect.{Async, Resource, Sync}
import cats.effect.unsafe.implicits.global
import com.snowplowanalytics.iglu.core.SchemaCriterion
import org.http4s.client.Client
import org.http4s.blaze.client.BlazeClientBuilder
import io.sentry.Sentry

import com.snowplowanalytics.snowplow.sources.SourceAndAck
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.snowflake.processing.{ChannelProvider, TableManager}
Expand All @@ -28,7 +28,8 @@ case class Environment[F[_]](
tblManager: TableManager[F],
channelProvider: ChannelProvider[F],
metrics: Metrics[F],
batching: Config.Batching
batching: Config.Batching,
schemasToSkip: List[SchemaCriterion]
)

object Environment {
Expand Down Expand Up @@ -58,7 +59,8 @@ object Environment {
tblManager = tblManager,
channelProvider = channelProvider,
metrics = metrics,
batching = config.batching
batching = config.batching,
schemasToSkip = config.skipSchemas
)

private def enableSentry[F[_]: Sync](appInfo: AppInfo, config: Option[Config.Sentry]): Resource[F, Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import cats.implicits._
import cats.{Applicative, Foldable, Monad}
import cats.effect.{Async, Sync}
import cats.effect.kernel.Unique
import com.snowplowanalytics.iglu.core.SchemaCriterion
import fs2.{Chunk, Pipe, Stream}
import net.snowflake.ingest.utils.{ErrorCode, SFException}
import org.typelevel.log4cats.Logger
Expand Down Expand Up @@ -108,7 +109,7 @@ object Processing {
if (writeFailures.isEmpty)
empty
else {
val indexed = events.toIndexedSeq
val indexed = events.copyToIndexedSeq
writeFailures.foldLeft(ParsedWriteResult.empty) { case (ParsedWriteResult(extraCols, eventsWithExtraCols, unexpected), failure) =>
val event = fastGetByIndex(indexed, failure.index)
if (failure.extraCols.nonEmpty)
Expand All @@ -126,7 +127,7 @@ object Processing {

in.through(setLatency(env.metrics))
.through(parseBytes(badProcessor))
.through(transform(badProcessor))
.through(transform(badProcessor, env.schemasToSkip))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.through(writeToSnowflake(env, badProcessor))
.through(sendFailedEvents(env))
Expand Down Expand Up @@ -166,24 +167,28 @@ object Processing {
}

/** Transform the Event into values compatible with the snowflake ingest sdk */
private def transform[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, ParsedBatch, TransformedBatch] =
private def transform[F[_]: Sync](
badProcessor: BadRowProcessor,
schemasToSkip: List[SchemaCriterion]
): Pipe[F, ParsedBatch, TransformedBatch] =
_.evalMap { batch =>
Sync[F].realTimeInstant.flatMap { now =>
val loadTstamp = SnowflakeCaster.timestampValue(now)
transformBatch[F](badProcessor, loadTstamp, batch)
transformBatch[F](badProcessor, loadTstamp, batch, schemasToSkip)
}
}

private def transformBatch[F[_]: Sync](
badProcessor: BadRowProcessor,
loadTstamp: OffsetDateTime,
batch: ParsedBatch
batch: ParsedBatch,
schemasToSkip: List[SchemaCriterion]
): F[TransformedBatch] =
Foldable[List]
.traverseSeparateUnordered(batch.events) { event =>
Sync[F].delay {
Transform
.transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, event)
.transformEventUnstructured[AnyRef](badProcessor, SnowflakeCaster, SnowflakeJsonFolder, event, schemasToSkip)
.map { namedValues =>
val map = namedValues
.map { case Caster.NamedValue(k, v) =>
Expand Down Expand Up @@ -271,7 +276,7 @@ object Processing {
val mapped = notWritten match {
case Nil => Nil
case more =>
val indexed = batch.toBeInserted.toIndexedSeq
val indexed = batch.toBeInserted.copyToIndexedSeq
more.map(f => (fastGetByIndex(indexed, f.index)._1, f.cause))
}
abortIfFatalException[F](mapped).as {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ object MockEnvironment {
maxBytes = 16000000,
maxDelay = 10.seconds,
uploadConcurrency = 1
)
),
schemasToSkip = List.empty
)
MockEnvironment(state, env)
}
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object Dependencies {
val awsSdk2 = "2.20.135"

// Snowplow
val streams = "0.2.0-M1a"
val streams = "0.2.0-M2"

// tests
val specs2 = "4.20.0"
Expand Down

0 comments on commit 1347ead

Please sign in to comment.