From d26696d6f219e9c1d4c4523b2e7d7b849d381695 Mon Sep 17 00:00:00 2001 From: Oguzhan Unlu Date: Thu, 17 Oct 2024 16:35:34 +0300 Subject: [PATCH] Add collector to target latency metrics --- .../Metrics.scala | 54 ++++++++++++++++- .../processing/Processing.scala | 59 +++++++++++++++---- 2 files changed, 100 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Metrics.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Metrics.scala index 76b4011..cf88046 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Metrics.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/Metrics.scala @@ -21,6 +21,11 @@ trait Metrics[F[_]] { def addGood(count: Int): F[Unit] def addBad(count: Int): F[Unit] def setLatencyMillis(latencyMillis: Long): F[Unit] + def setLatencyCollectorToTargetMillis(latencyMillis: Long): F[Unit] + def setLatencyCollectorToTargetPessimisticMillis(latencyMillis: Long): F[Unit] + + def setFailedMaxCollectorTstamp(tstampMillis: Long): F[Unit] + def clearFailedMaxCollectorTstamp(): F[Unit] def report: Stream[F, Nothing] } @@ -33,18 +38,33 @@ object Metrics { private case class State( good: Int, bad: Int, - latencyMillis: Long + latencyMillis: Long, + latencyCollectorToTargetMillis: Option[Long], + latencyCollectorToTargetPessimisticMillis: Option[Long], + failedMaxCollectorTstampMillis: Option[Long] ) extends CommonMetrics.State { + private def getLatencyCollectorToTargetMillis: Long = { + lazy val failedLatency = failedMaxCollectorTstampMillis.map(t => System.currentTimeMillis() - t) + latencyCollectorToTargetMillis.fold(failedLatency.fold(0L)(identity))(identity) + } + + private def getLatencyCollectorToTargetPessimisticMillis: Long = { + lazy val failedLatency = failedMaxCollectorTstampMillis.map(t => System.currentTimeMillis() - t) + latencyCollectorToTargetPessimisticMillis.fold(failedLatency.fold(0L)(identity))(identity) + } + def toKVMetrics: List[CommonMetrics.KVMetric] = List( KVMetric.CountGood(good), KVMetric.CountBad(bad), - KVMetric.LatencyMillis(latencyMillis) + KVMetric.LatencyMillis(latencyMillis), + KVMetric.LatencyCollectorToTargetMillis(getLatencyCollectorToTargetMillis), + KVMetric.LatencyCollectorToTargetPessimisticMillis(getLatencyCollectorToTargetPessimisticMillis) ) } private object State { - def empty: State = State(0, 0, 0L) + def empty: State = State(0, 0, 0L, None, None, None) } private def impl[F[_]: Async](config: Config.Metrics, ref: Ref[F, State]): Metrics[F] = @@ -55,6 +75,23 @@ object Metrics { ref.update(s => s.copy(bad = s.bad + count)) def setLatencyMillis(latencyMillis: Long): F[Unit] = ref.update(s => s.copy(latencyMillis = s.latencyMillis.max(latencyMillis))) + def setLatencyCollectorToTargetMillis(latencyMillis: Long): F[Unit] = + ref.update(s => + s.copy(latencyCollectorToTargetMillis = s.latencyCollectorToTargetMillis.fold(latencyMillis)(_.min(latencyMillis)).some) + ) + def setLatencyCollectorToTargetPessimisticMillis(latencyMillis: Long): F[Unit] = + ref.update(s => + s.copy(latencyCollectorToTargetPessimisticMillis = + s.latencyCollectorToTargetPessimisticMillis.fold(latencyMillis)(_.max(latencyMillis)).some + ) + ) + + def setFailedMaxCollectorTstamp(tstampMillis: Long): F[Unit] = + ref.update(s => + s.copy(failedMaxCollectorTstampMillis = s.failedMaxCollectorTstampMillis.fold(tstampMillis)(_.max(tstampMillis)).some) + ) + def clearFailedMaxCollectorTstamp(): F[Unit] = + ref.update(s => s.copy(failedMaxCollectorTstampMillis = None)) } private object KVMetric { @@ -77,5 +114,16 @@ object Metrics { val metricType = CommonMetrics.MetricType.Gauge } + final case class LatencyCollectorToTargetMillis(v: Long) extends CommonMetrics.KVMetric { + val key = "latency_collector_to_target_millis" + val value = v.toString + val metricType = CommonMetrics.MetricType.Gauge + } + + final case class LatencyCollectorToTargetPessimisticMillis(v: Long) extends CommonMetrics.KVMetric { + val key = "latency_collector_to_target_pessimistic_millis" + val value = v.toString + val metricType = CommonMetrics.MetricType.Gauge + } } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index 58914f2..cd659d3 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -21,8 +21,7 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import java.nio.charset.StandardCharsets -import java.time.OffsetDateTime - +import java.time.{Instant, OffsetDateTime} import com.snowplowanalytics.iglu.schemaddl.parquet.Caster import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor} @@ -86,7 +85,9 @@ object Processing { origBatchBytes: Long, origBatchCount: Int, badAccumulated: ListOfList[BadRow], - tokens: Vector[Unique.Token] + tokens: Vector[Unique.Token], + maxCollectorTstamp: Instant, + minCollectorTstamp: Instant ) /** @@ -246,6 +247,15 @@ object Processing { } } + private def setCollectorToTargetLatencyMetrics[F[_]: Sync](metrics: Metrics[F], batch: BatchAfterTransform): F[Unit] = + for { + now <- Sync[F].realTime + optimistic = now.toMillis - batch.maxCollectorTstamp.toEpochMilli + pessimistic = now.toMillis - batch.minCollectorTstamp.toEpochMilli + _ <- metrics.setLatencyCollectorToTargetMillis(optimistic) + _ <- metrics.setLatencyCollectorToTargetPessimisticMillis(pessimistic) + } yield () + /** * First attempt to write events with the Snowflake SDK * @@ -258,8 +268,15 @@ object Processing { batch: BatchAfterTransform ): F[BatchAfterTransform] = withWriteAttempt(env, batch) { notWritten => + val setLatency = + // best case branch 1 - all events are inserted + if (notWritten.isEmpty) + setCollectorToTargetLatencyMetrics(env.metrics, batch) >> + env.metrics.clearFailedMaxCollectorTstamp() + else Sync[F].unit val parsedResult = ParsedWriteResult.buildFrom(batch.toBeInserted, notWritten) for { + _ <- setLatency _ <- abortIfFatalException[F](parsedResult.unexpectedFailures) _ <- handleSchemaEvolution(env, parsedResult.extraColsRequired) } yield { @@ -285,13 +302,23 @@ object Processing { batch: BatchAfterTransform ): F[BatchAfterTransform] = withWriteAttempt(env, batch) { notWritten => + val setLatency = + // best case branch 2 - all remaining events left from attempt 1 are inserted + if (notWritten.isEmpty) + setCollectorToTargetLatencyMetrics(env.metrics, batch) >> + env.metrics.clearFailedMaxCollectorTstamp() + // worst case - none of the events are inserted + else if (notWritten.size == batch.origBatchCount) { + env.metrics.setFailedMaxCollectorTstamp(batch.maxCollectorTstamp.toEpochMilli) >> + setCollectorToTargetLatencyMetrics(env.metrics, batch) + } else Sync[F].unit val mapped = notWritten match { case Nil => Nil case more => val indexed = batch.toBeInserted.copyToIndexedSeq more.map(f => (fastGetByIndex(indexed, f.index)._1, f.cause)) } - abortIfFatalException[F](mapped).as { + setLatency >> abortIfFatalException[F](mapped).as { val moreBad = mapped.map { case (event, sfe) => badRowFromEnqueueFailure(badProcessor, event, sfe) } @@ -385,15 +412,25 @@ object Processing { private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt) + private implicit class MaxCollectorTstamp(events: List[EventWithTransform]) { + def maxCollectorTstamp(): Instant = events.map(_._1).map(_.collector_tstamp).max + } + + private implicit class MinCollectorTstamp(events: List[EventWithTransform]) { + def minCollectorTstamp(): Instant = events.map(_._1).map(_.collector_tstamp).min + } + private implicit def batchable: BatchUp.Batchable[TransformedBatch, BatchAfterTransform] = new BatchUp.Batchable[TransformedBatch, BatchAfterTransform] { def combine(b: BatchAfterTransform, a: TransformedBatch): BatchAfterTransform = BatchAfterTransform( - toBeInserted = b.toBeInserted.prepend(a.events), - origBatchBytes = b.origBatchBytes + a.countBytes, - origBatchCount = b.origBatchCount + a.countItems, - badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures), - tokens = b.tokens :+ a.token + toBeInserted = b.toBeInserted.prepend(a.events), + origBatchBytes = b.origBatchBytes + a.countBytes, + origBatchCount = b.origBatchCount + a.countItems, + badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures), + tokens = b.tokens :+ a.token, + maxCollectorTstamp = List(b.maxCollectorTstamp, a.events.maxCollectorTstamp()).max, + minCollectorTstamp = List(b.maxCollectorTstamp, a.events.maxCollectorTstamp()).min ) def single(a: TransformedBatch): BatchAfterTransform = @@ -402,7 +439,9 @@ object Processing { a.countBytes, a.countItems, ListOfList.ofLists(a.parseFailures, a.transformFailures), - Vector(a.token) + Vector(a.token), + a.events.maxCollectorTstamp(), + a.events.minCollectorTstamp() ) def weightOf(a: TransformedBatch): Long =