Skip to content

Commit

Permalink
add best & worst case
Browse files Browse the repository at this point in the history
  • Loading branch information
oguzhanunlu committed Oct 18, 2024
1 parent 2920f55 commit 8aa692a
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ trait Metrics[F[_]] {
def addGood(count: Int): F[Unit]
def addBad(count: Int): F[Unit]
def setLatencyMillis(latencyMillis: Long): F[Unit]
def setLatencyCollectorToTargetMillis(latencyMillis: Long): F[Unit]
def setLatencyCollectorToTargetPessimisticMillis(latencyMillis: Long): F[Unit]

def report: Stream[F, Nothing]
}
Expand All @@ -33,18 +35,22 @@ object Metrics {
private case class State(
good: Int,
bad: Int,
latencyMillis: Long
latencyMillis: Long,
latencyCollectorToTargetMillis: Long,
latencyCollectorToTargetPessimisticMillis: Long
) extends CommonMetrics.State {
def toKVMetrics: List[CommonMetrics.KVMetric] =
List(
KVMetric.CountGood(good),
KVMetric.CountBad(bad),
KVMetric.LatencyMillis(latencyMillis)
KVMetric.LatencyMillis(latencyMillis),
KVMetric.LatencyCollectorToTargetMillis(latencyCollectorToTargetMillis),
KVMetric.LatencyCollectorToTargetPessimisticMillis(latencyCollectorToTargetPessimisticMillis)
)
}

private object State {
def empty: State = State(0, 0, 0L)
def empty: State = State(0, 0, 0L, 0L, 0L)
}

private def impl[F[_]: Async](config: Config.Metrics, ref: Ref[F, State]): Metrics[F] =
Expand All @@ -55,6 +61,10 @@ object Metrics {
ref.update(s => s.copy(bad = s.bad + count))
def setLatencyMillis(latencyMillis: Long): F[Unit] =
ref.update(s => s.copy(latencyMillis = s.latencyMillis.max(latencyMillis)))
def setLatencyCollectorToTargetMillis(latencyMillis: Long): F[Unit] =
ref.update(s => s.copy(latencyMillis = s.latencyMillis.min(latencyMillis)))
def setLatencyCollectorToTargetPessimisticMillis(latencyMillis: Long): F[Unit] =
ref.update(s => s.copy(latencyMillis = s.latencyMillis.max(latencyMillis)))
}

private object KVMetric {
Expand All @@ -77,5 +87,16 @@ object Metrics {
val metricType = CommonMetrics.MetricType.Gauge
}

final case class LatencyCollectorToTargetMillis(v: Long) extends CommonMetrics.KVMetric {
val key = "latency_collector_to_target_millis"
val value = v.toString
val metricType = CommonMetrics.MetricType.Gauge
}

final case class LatencyCollectorToTargetPessimisticMillis(v: Long) extends CommonMetrics.KVMetric {
val key = "latency_collector_to_target_pessimistic_millis"
val value = v.toString
val metricType = CommonMetrics.MetricType.Gauge
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.charset.StandardCharsets
import java.time.OffsetDateTime

import java.time.{Instant, OffsetDateTime}
import com.snowplowanalytics.iglu.schemaddl.parquet.Caster
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor}
Expand Down Expand Up @@ -86,7 +85,9 @@ object Processing {
origBatchBytes: Long,
origBatchCount: Int,
badAccumulated: ListOfList[BadRow],
tokens: Vector[Unique.Token]
tokens: Vector[Unique.Token],
maxCollectorTstamp: Instant,
minCollectorTstamp: Instant
)

/**
Expand Down Expand Up @@ -246,6 +247,15 @@ object Processing {
}
}

private def setCollectorToTargetLatencyMetrics[F[_]: Sync](metrics: Metrics[F], batch: BatchAfterTransform): F[Unit] =
for {
now <- Sync[F].realTime
optimistic = now.toMillis - batch.maxCollectorTstamp.toEpochMilli
pessimistic = now.toMillis - batch.minCollectorTstamp.toEpochMilli
_ <- metrics.setLatencyCollectorToTargetMillis(optimistic)
_ <- metrics.setLatencyCollectorToTargetPessimisticMillis(pessimistic)
} yield ()

/**
* First attempt to write events with the Snowflake SDK
*
Expand All @@ -258,8 +268,13 @@ object Processing {
batch: BatchAfterTransform
): F[BatchAfterTransform] =
withWriteAttempt(env, batch) { notWritten =>
val setLatency =
// best case branch 1 - all events are inserted
if (notWritten.isEmpty) setCollectorToTargetLatencyMetrics(env.metrics, batch)
else Sync[F].unit
val parsedResult = ParsedWriteResult.buildFrom(batch.toBeInserted, notWritten)
for {
_ <- setLatency
_ <- abortIfFatalException[F](parsedResult.unexpectedFailures)
_ <- handleSchemaEvolution(env, parsedResult.extraColsRequired)
} yield {
Expand All @@ -285,13 +300,22 @@ object Processing {
batch: BatchAfterTransform
): F[BatchAfterTransform] =
withWriteAttempt(env, batch) { notWritten =>
val setLatency = {
// best case branch 2 - all remaining events left from attempt 1 are inserted
if (notWritten.isEmpty) setCollectorToTargetLatencyMetrics(env.metrics, batch)
// worst case - none of the events are inserted
else if (notWritten.size == batch.origBatchCount) {
setCollectorToTargetLatencyMetrics(env.metrics, batch)
}
else Sync[F].unit
}
val mapped = notWritten match {
case Nil => Nil
case more =>
val indexed = batch.toBeInserted.copyToIndexedSeq
more.map(f => (fastGetByIndex(indexed, f.index)._1, f.cause))
}
abortIfFatalException[F](mapped).as {
setLatency >> abortIfFatalException[F](mapped).as {
val moreBad = mapped.map { case (event, sfe) =>
badRowFromEnqueueFailure(badProcessor, event, sfe)
}
Expand Down Expand Up @@ -385,15 +409,25 @@ object Processing {

private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt)

private implicit class MaxCollectorTstamp(events: List[EventWithTransform]) {
def maxCollectorTstamp(): Instant = events.map(_._1).map(_.collector_tstamp).max
}

private implicit class MinCollectorTstamp(events: List[EventWithTransform]) {
def minCollectorTstamp(): Instant = events.map(_._1).map(_.collector_tstamp).min
}

private implicit def batchable: BatchUp.Batchable[TransformedBatch, BatchAfterTransform] =
new BatchUp.Batchable[TransformedBatch, BatchAfterTransform] {
def combine(b: BatchAfterTransform, a: TransformedBatch): BatchAfterTransform =
BatchAfterTransform(
toBeInserted = b.toBeInserted.prepend(a.events),
origBatchBytes = b.origBatchBytes + a.countBytes,
origBatchCount = b.origBatchCount + a.countItems,
badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
tokens = b.tokens :+ a.token
toBeInserted = b.toBeInserted.prepend(a.events),
origBatchBytes = b.origBatchBytes + a.countBytes,
origBatchCount = b.origBatchCount + a.countItems,
badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
tokens = b.tokens :+ a.token,
maxCollectorTstamp = List(b.maxCollectorTstamp, a.events.maxCollectorTstamp()).max,
minCollectorTstamp = List(b.maxCollectorTstamp, a.events.maxCollectorTstamp()).min
)

def single(a: TransformedBatch): BatchAfterTransform =
Expand All @@ -402,7 +436,9 @@ object Processing {
a.countBytes,
a.countItems,
ListOfList.ofLists(a.parseFailures, a.transformFailures),
Vector(a.token)
Vector(a.token),
a.events.maxCollectorTstamp(),
a.events.minCollectorTstamp()
)

def weightOf(a: TransformedBatch): Long =
Expand Down

0 comments on commit 8aa692a

Please sign in to comment.