Skip to content

Commit

Permalink
Add collector to target latency metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Oct 18, 2024
1 parent 2920f55 commit 68bcdc0
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ trait Metrics[F[_]] {
def addGood(count: Int): F[Unit]
def addBad(count: Int): F[Unit]
def setLatencyMillis(latencyMillis: Long): F[Unit]
def setLatencyCollectorToTargetMillis(latencyMillis: Long): F[Unit]
def setLatencyCollectorToTargetPessimisticMillis(latencyMillis: Long): F[Unit]

def setFailedMaxCollectorTstamp(tstampMillis: Long): F[Unit]
def clearFailedMaxCollectorTstamp(): F[Unit]

def report: Stream[F, Nothing]
}
Expand All @@ -33,18 +38,33 @@ object Metrics {
private case class State(
good: Int,
bad: Int,
latencyMillis: Long
latencyMillis: Long,
latencyCollectorToTargetMillis: Option[Long],
latencyCollectorToTargetPessimisticMillis: Option[Long],
failedMaxCollectorTstampMillis: Option[Long]
) extends CommonMetrics.State {
private def getLatencyCollectorToTargetMillis: Long = {
val failedLatency = failedMaxCollectorTstampMillis.map(t => System.currentTimeMillis() - t)
latencyCollectorToTargetMillis.fold(failedLatency.fold(0L)(identity))(identity)
}

private def getLatencyCollectorToTargetPessimisticMillis: Long = {
val failedLatency = failedMaxCollectorTstampMillis.map(t => System.currentTimeMillis() - t)
latencyCollectorToTargetPessimisticMillis.fold(failedLatency.fold(0L)(identity))(identity)
}

def toKVMetrics: List[CommonMetrics.KVMetric] =
List(
KVMetric.CountGood(good),
KVMetric.CountBad(bad),
KVMetric.LatencyMillis(latencyMillis)
KVMetric.LatencyMillis(latencyMillis),
KVMetric.LatencyCollectorToTargetMillis(getLatencyCollectorToTargetMillis),
KVMetric.LatencyCollectorToTargetPessimisticMillis(getLatencyCollectorToTargetPessimisticMillis)
)
}

private object State {
def empty: State = State(0, 0, 0L)
def empty: State = State(0, 0, 0L, None, None, None)
}

private def impl[F[_]: Async](config: Config.Metrics, ref: Ref[F, State]): Metrics[F] =
Expand All @@ -55,6 +75,23 @@ object Metrics {
ref.update(s => s.copy(bad = s.bad + count))
def setLatencyMillis(latencyMillis: Long): F[Unit] =
ref.update(s => s.copy(latencyMillis = s.latencyMillis.max(latencyMillis)))
def setLatencyCollectorToTargetMillis(latencyMillis: Long): F[Unit] =
ref.update(s =>
s.copy(latencyCollectorToTargetMillis = s.latencyCollectorToTargetMillis.fold(latencyMillis)(_.min(latencyMillis)).some)
)
def setLatencyCollectorToTargetPessimisticMillis(latencyMillis: Long): F[Unit] =
ref.update(s =>
s.copy(latencyCollectorToTargetPessimisticMillis =
s.latencyCollectorToTargetPessimisticMillis.fold(latencyMillis)(_.max(latencyMillis)).some
)
)

def setFailedMaxCollectorTstamp(tstampMillis: Long): F[Unit] =
ref.update(s =>
s.copy(failedMaxCollectorTstampMillis = s.failedMaxCollectorTstampMillis.fold(tstampMillis)(_.max(tstampMillis)).some)
)
def clearFailedMaxCollectorTstamp(): F[Unit] =
ref.update(s => s.copy(failedMaxCollectorTstampMillis = None))
}

private object KVMetric {
Expand All @@ -77,5 +114,16 @@ object Metrics {
val metricType = CommonMetrics.MetricType.Gauge
}

final case class LatencyCollectorToTargetMillis(v: Long) extends CommonMetrics.KVMetric {
val key = "latency_collector_to_target_millis"
val value = v.toString
val metricType = CommonMetrics.MetricType.Gauge
}

final case class LatencyCollectorToTargetPessimisticMillis(v: Long) extends CommonMetrics.KVMetric {
val key = "latency_collector_to_target_pessimistic_millis"
val value = v.toString
val metricType = CommonMetrics.MetricType.Gauge
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.charset.StandardCharsets
import java.time.OffsetDateTime

import java.time.{Instant, OffsetDateTime}
import com.snowplowanalytics.iglu.schemaddl.parquet.Caster
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor}
Expand Down Expand Up @@ -86,7 +85,9 @@ object Processing {
origBatchBytes: Long,
origBatchCount: Int,
badAccumulated: ListOfList[BadRow],
tokens: Vector[Unique.Token]
tokens: Vector[Unique.Token],
maxCollectorTstamp: Instant,
minCollectorTstamp: Instant
)

/**
Expand Down Expand Up @@ -246,6 +247,15 @@ object Processing {
}
}

private def setCollectorToTargetLatencyMetrics[F[_]: Sync](metrics: Metrics[F], batch: BatchAfterTransform): F[Unit] =
for {
now <- Sync[F].realTime
optimistic = now.toMillis - batch.maxCollectorTstamp.toEpochMilli
pessimistic = now.toMillis - batch.minCollectorTstamp.toEpochMilli
_ <- metrics.setLatencyCollectorToTargetMillis(optimistic)
_ <- metrics.setLatencyCollectorToTargetPessimisticMillis(pessimistic)
} yield ()

/**
* First attempt to write events with the Snowflake SDK
*
Expand All @@ -258,8 +268,15 @@ object Processing {
batch: BatchAfterTransform
): F[BatchAfterTransform] =
withWriteAttempt(env, batch) { notWritten =>
val setLatency =
// best case branch 1 - all events are inserted
if (notWritten.isEmpty)
setCollectorToTargetLatencyMetrics(env.metrics, batch) >>
env.metrics.clearFailedMaxCollectorTstamp()
else Sync[F].unit
val parsedResult = ParsedWriteResult.buildFrom(batch.toBeInserted, notWritten)
for {
_ <- setLatency
_ <- abortIfFatalException[F](parsedResult.unexpectedFailures)
_ <- handleSchemaEvolution(env, parsedResult.extraColsRequired)
} yield {
Expand All @@ -285,13 +302,23 @@ object Processing {
batch: BatchAfterTransform
): F[BatchAfterTransform] =
withWriteAttempt(env, batch) { notWritten =>
val setLatency =
// best case branch 2 - all remaining events left from attempt 1 are inserted
if (notWritten.isEmpty)
setCollectorToTargetLatencyMetrics(env.metrics, batch) >>
env.metrics.clearFailedMaxCollectorTstamp()
// worst case - none of the events are inserted
else if (notWritten.size == batch.origBatchCount) {
env.metrics.setFailedMaxCollectorTstamp(batch.maxCollectorTstamp.toEpochMilli) >>
setCollectorToTargetLatencyMetrics(env.metrics, batch)
} else Sync[F].unit
val mapped = notWritten match {
case Nil => Nil
case more =>
val indexed = batch.toBeInserted.copyToIndexedSeq
more.map(f => (fastGetByIndex(indexed, f.index)._1, f.cause))
}
abortIfFatalException[F](mapped).as {
setLatency >> abortIfFatalException[F](mapped).as {
val moreBad = mapped.map { case (event, sfe) =>
badRowFromEnqueueFailure(badProcessor, event, sfe)
}
Expand Down Expand Up @@ -385,15 +412,25 @@ object Processing {

private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt)

private implicit class MaxCollectorTstamp(events: List[EventWithTransform]) {
def maxCollectorTstamp(): Instant = events.map(_._1).map(_.collector_tstamp).max
}

private implicit class MinCollectorTstamp(events: List[EventWithTransform]) {
def minCollectorTstamp(): Instant = events.map(_._1).map(_.collector_tstamp).min
}

private implicit def batchable: BatchUp.Batchable[TransformedBatch, BatchAfterTransform] =
new BatchUp.Batchable[TransformedBatch, BatchAfterTransform] {
def combine(b: BatchAfterTransform, a: TransformedBatch): BatchAfterTransform =
BatchAfterTransform(
toBeInserted = b.toBeInserted.prepend(a.events),
origBatchBytes = b.origBatchBytes + a.countBytes,
origBatchCount = b.origBatchCount + a.countItems,
badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
tokens = b.tokens :+ a.token
toBeInserted = b.toBeInserted.prepend(a.events),
origBatchBytes = b.origBatchBytes + a.countBytes,
origBatchCount = b.origBatchCount + a.countItems,
badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
tokens = b.tokens :+ a.token,
maxCollectorTstamp = List(b.maxCollectorTstamp, a.events.maxCollectorTstamp()).max,
minCollectorTstamp = List(b.maxCollectorTstamp, a.events.maxCollectorTstamp()).min
)

def single(a: TransformedBatch): BatchAfterTransform =
Expand All @@ -402,7 +439,9 @@ object Processing {
a.countBytes,
a.countItems,
ListOfList.ofLists(a.parseFailures, a.transformFailures),
Vector(a.token)
Vector(a.token),
a.events.maxCollectorTstamp(),
a.events.minCollectorTstamp()
)

def weightOf(a: TransformedBatch): Long =
Expand Down

0 comments on commit 68bcdc0

Please sign in to comment.