Skip to content

Commit

Permalink
TBR
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 13, 2024
1 parent dfd0f49 commit b08e896
Showing 1 changed file with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.charset.StandardCharsets
import java.time.OffsetDateTime
import scala.concurrent.duration.DurationLong

import com.snowplowanalytics.iglu.schemaddl.parquet.Caster
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
Expand Down Expand Up @@ -122,6 +123,7 @@ object Processing {
in.through(setLatency(env.metrics))
.through(parseAndTransform(env, badProcessor))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.prefetchN(env.batching.uploadConcurrency)
.through(writeToSnowflake(env, badProcessor))
.through(sendFailedEvents(env, badProcessor))
.through(sendMetrics(env))
Expand Down Expand Up @@ -357,9 +359,23 @@ object Processing {
env.metrics.addGood(batch.origBatchCount - countBad) *> env.metrics.addBad(countBad)
}

private def emitTokens[F[_]]: Pipe[F, BatchAfterTransform, Unique.Token] =
_.flatMap { batch =>
Stream.emits(batch.tokens)
private implicit def batchable2: BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] =
new BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] {
def combine(b: Vector[Unique.Token], a: BatchAfterTransform): Vector[Unique.Token] =
b ++ a.tokens

def single(a: BatchAfterTransform): Vector[Unique.Token] =
a.tokens

def weightOf(a: BatchAfterTransform): Long =
0L
}

private def emitTokens[F[_]: Async]: Pipe[F, BatchAfterTransform, Unique.Token] =
BatchUp.withTimeout[F, BatchAfterTransform, Vector[Unique.Token]](Long.MaxValue, 10.seconds).andThen {
_.flatMap { tokens =>
Stream.emits(tokens)
}
}

private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt)
Expand Down

0 comments on commit b08e896

Please sign in to comment.