Skip to content

Commit

Permalink
Bring latency metrics happy path back (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu authored Oct 28, 2024
1 parent 0e67fc7 commit 79faf88
Showing 1 changed file with 38 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.charset.StandardCharsets
import java.time.OffsetDateTime
import java.time.{Instant, OffsetDateTime}
import com.snowplowanalytics.iglu.schemaddl.parquet.Caster
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor}
Expand Down Expand Up @@ -85,7 +85,9 @@ object Processing {
origBatchBytes: Long,
origBatchCount: Int,
badAccumulated: ListOfList[BadRow],
tokens: Vector[Unique.Token]
tokens: Vector[Unique.Token],
maxCollectorTstamp: Option[Instant],
minCollectorTstamp: Option[Instant]
)

/**
Expand Down Expand Up @@ -132,6 +134,7 @@ object Processing {
.through(transform(badProcessor, env.schemasToSkip))
.through(BatchUp.withTimeout(env.batching.maxBytes, env.batching.maxDelay))
.through(writeToSnowflake(env, badProcessor))
.through(setCollectorToTargetLatencyMetrics(env.metrics))
.through(sendFailedEvents(env, badProcessor))
.through(sendMetrics(env))
.through(emitTokens)
Expand Down Expand Up @@ -260,6 +263,21 @@ object Processing {
}
}

private def setCollectorToTargetLatencyMetrics[F[_]: Sync](metrics: Metrics[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap { batch =>
(batch.maxCollectorTstamp, batch.minCollectorTstamp)
.mapN { (maxCT, minCT) =>
for {
now <- Sync[F].realTime
optimistic = now.toMillis - maxCT.toEpochMilli
pessimistic = now.toMillis - minCT.toEpochMilli
_ <- metrics.setLatencyCollectorToTargetMillis(optimistic)
_ <- metrics.setLatencyCollectorToTargetPessimisticMillis(pessimistic)
} yield ()
}
.fold(Sync[F].unit)(identity)
}

/**
* First attempt to write events with the Snowflake SDK
*
Expand Down Expand Up @@ -399,15 +417,25 @@ object Processing {

private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt)

private implicit class MaxCollectorTstamp(events: List[EventWithTransform]) {
def maxCollectorTstamp(): Option[Instant] = events.map(_._1).map(_.collector_tstamp).maxOption
}

private implicit class MinCollectorTstamp(events: List[EventWithTransform]) {
def minCollectorTstamp(): Option[Instant] = events.map(_._1).map(_.collector_tstamp).minOption
}

private implicit def batchable: BatchUp.Batchable[TransformedBatch, BatchAfterTransform] =
new BatchUp.Batchable[TransformedBatch, BatchAfterTransform] {
def combine(b: BatchAfterTransform, a: TransformedBatch): BatchAfterTransform =
BatchAfterTransform(
toBeInserted = b.toBeInserted.prepend(a.events),
origBatchBytes = b.origBatchBytes + a.countBytes,
origBatchCount = b.origBatchCount + a.countItems,
badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
tokens = b.tokens :+ a.token
toBeInserted = b.toBeInserted.prepend(a.events),
origBatchBytes = b.origBatchBytes + a.countBytes,
origBatchCount = b.origBatchCount + a.countItems,
badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
tokens = b.tokens :+ a.token,
maxCollectorTstamp = List(b.maxCollectorTstamp, a.events.maxCollectorTstamp()).max,
minCollectorTstamp = List(b.maxCollectorTstamp, a.events.maxCollectorTstamp()).min
)

def single(a: TransformedBatch): BatchAfterTransform =
Expand All @@ -416,7 +444,9 @@ object Processing {
a.countBytes,
a.countItems,
ListOfList.ofLists(a.parseFailures, a.transformFailures),
Vector(a.token)
Vector(a.token),
a.events.maxCollectorTstamp(),
a.events.minCollectorTstamp()
)

def weightOf(a: TransformedBatch): Long =
Expand Down

0 comments on commit 79faf88

Please sign in to comment.