@@ -21,8 +21,7 @@ import org.typelevel.log4cats.Logger
21
21
import org .typelevel .log4cats .slf4j .Slf4jLogger
22
22
23
23
import java .nio .charset .StandardCharsets
24
- import java .time .OffsetDateTime
25
-
24
+ import java .time .{Instant , OffsetDateTime }
26
25
import com .snowplowanalytics .iglu .schemaddl .parquet .Caster
27
26
import com .snowplowanalytics .snowplow .analytics .scalasdk .Event
28
27
import com .snowplowanalytics .snowplow .badrows .{BadRow , Payload => BadPayload , Processor => BadRowProcessor }
@@ -86,7 +85,9 @@ object Processing {
86
85
origBatchBytes : Long ,
87
86
origBatchCount : Int ,
88
87
badAccumulated : ListOfList [BadRow ],
89
- tokens : Vector [Unique .Token ]
88
+ tokens : Vector [Unique .Token ],
89
+ maxCollectorTstamp : Instant ,
90
+ minCollectorTstamp : Instant
90
91
)
91
92
92
93
/**
@@ -246,6 +247,18 @@ object Processing {
246
247
}
247
248
}
248
249
250
+ private def setLatencyCollectorToTargetMillis [F [_]: Sync ](
251
+ metrics : Metrics [F ],
252
+ collectorTstamp : Instant ,
253
+ optimistic : Boolean
254
+ ): F [Unit ] =
255
+ for {
256
+ now <- Sync [F ].realTime
257
+ latencyMillis = now.toMillis - collectorTstamp.toEpochMilli
258
+ _ <- if (optimistic) metrics.setLatencyCollectorToTargetMillis(latencyMillis)
259
+ else metrics.setLatencyCollectorToTargetPessimisticMillis(latencyMillis)
260
+ } yield ()
261
+
249
262
/**
250
263
* First attempt to write events with the Snowflake SDK
251
264
*
@@ -258,8 +271,14 @@ object Processing {
258
271
batch : BatchAfterTransform
259
272
): F [BatchAfterTransform ] =
260
273
withWriteAttempt(env, batch) { notWritten =>
274
+ val setLatency =
275
+ if (notWritten.isEmpty) {
276
+ setLatencyCollectorToTargetMillis(env.metrics, batch.maxCollectorTstamp, optimistic = true ) >>
277
+ setLatencyCollectorToTargetMillis(env.metrics, batch.minCollectorTstamp, optimistic = false )
278
+ } else Sync [F ].unit
261
279
val parsedResult = ParsedWriteResult .buildFrom(batch.toBeInserted, notWritten)
262
280
for {
281
+ _ <- setLatency
263
282
_ <- abortIfFatalException[F ](parsedResult.unexpectedFailures)
264
283
_ <- handleSchemaEvolution(env, parsedResult.extraColsRequired)
265
284
} yield {
@@ -285,13 +304,18 @@ object Processing {
285
304
batch : BatchAfterTransform
286
305
): F [BatchAfterTransform ] =
287
306
withWriteAttempt(env, batch) { notWritten =>
307
+ val setLatency =
308
+ if (notWritten.isEmpty) {
309
+ setLatencyCollectorToTargetMillis(env.metrics, batch.maxCollectorTstamp, optimistic = true ) >>
310
+ setLatencyCollectorToTargetMillis(env.metrics, batch.minCollectorTstamp, optimistic = false )
311
+ } else Sync [F ].unit
288
312
val mapped = notWritten match {
289
313
case Nil => Nil
290
314
case more =>
291
315
val indexed = batch.toBeInserted.copyToIndexedSeq
292
316
more.map(f => (fastGetByIndex(indexed, f.index)._1, f.cause))
293
317
}
294
- abortIfFatalException[F ](mapped).as {
318
+ setLatency >> abortIfFatalException[F ](mapped).as {
295
319
val moreBad = mapped.map { case (event, sfe) =>
296
320
badRowFromEnqueueFailure(badProcessor, event, sfe)
297
321
}
@@ -385,15 +409,25 @@ object Processing {
385
409
386
410
private def fastGetByIndex [A ](items : IndexedSeq [A ], index : Long ): A = items(index.toInt)
387
411
412
+ private implicit class MaxCollectorTstamp (events : List [EventWithTransform ]) {
413
+ def maxCollectorTstamp (): Instant = events.map(_._1).map(_.collector_tstamp).max
414
+ }
415
+
416
+ private implicit class MinCollectorTstamp (events : List [EventWithTransform ]) {
417
+ def minCollectorTstamp (): Instant = events.map(_._1).map(_.collector_tstamp).min
418
+ }
419
+
388
420
private implicit def batchable : BatchUp .Batchable [TransformedBatch , BatchAfterTransform ] =
389
421
new BatchUp .Batchable [TransformedBatch , BatchAfterTransform ] {
390
422
def combine (b : BatchAfterTransform , a : TransformedBatch ): BatchAfterTransform =
391
423
BatchAfterTransform (
392
- toBeInserted = b.toBeInserted.prepend(a.events),
393
- origBatchBytes = b.origBatchBytes + a.countBytes,
394
- origBatchCount = b.origBatchCount + a.countItems,
395
- badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
396
- tokens = b.tokens :+ a.token
424
+ toBeInserted = b.toBeInserted.prepend(a.events),
425
+ origBatchBytes = b.origBatchBytes + a.countBytes,
426
+ origBatchCount = b.origBatchCount + a.countItems,
427
+ badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
428
+ tokens = b.tokens :+ a.token,
429
+ maxCollectorTstamp = List (b.maxCollectorTstamp, a.events.maxCollectorTstamp()).max,
430
+ minCollectorTstamp = List (b.maxCollectorTstamp, a.events.maxCollectorTstamp()).min
397
431
)
398
432
399
433
def single (a : TransformedBatch ): BatchAfterTransform =
@@ -402,7 +436,9 @@ object Processing {
402
436
a.countBytes,
403
437
a.countItems,
404
438
ListOfList .ofLists(a.parseFailures, a.transformFailures),
405
- Vector (a.token)
439
+ Vector (a.token),
440
+ a.events.maxCollectorTstamp(),
441
+ a.events.minCollectorTstamp()
406
442
)
407
443
408
444
def weightOf (a : TransformedBatch ): Long =
0 commit comments