8
8
package com .snowplowanalytics .snowplow .snowflake .processing
9
9
10
10
import cats .implicits ._
11
- import cats .{Applicative , Foldable , Monad , Semigroup }
11
+ import cats .{Applicative , Foldable , Monad }
12
12
import cats .effect .{Async , Sync }
13
13
import cats .effect .kernel .Unique
14
- import fs2 .{Pipe , Pull , Stream }
14
+ import fs2 .{Chunk , Pipe , Stream }
15
15
import net .snowflake .ingest .utils .{ErrorCode , SFException }
16
16
import org .typelevel .log4cats .Logger
17
17
import org .typelevel .log4cats .slf4j .Slf4jLogger
18
18
19
19
import java .nio .charset .StandardCharsets
20
20
import java .time .OffsetDateTime
21
- import scala .concurrent .duration .Duration
22
21
23
22
import com .snowplowanalytics .iglu .schemaddl .parquet .Caster
24
23
import com .snowplowanalytics .snowplow .analytics .scalasdk .Event
25
24
import com .snowplowanalytics .snowplow .badrows .{BadRow , Payload => BadPayload , Processor => BadRowProcessor }
26
25
import com .snowplowanalytics .snowplow .badrows .Payload .{RawPayload => BadRowRawPayload }
27
26
import com .snowplowanalytics .snowplow .sources .{EventProcessingConfig , EventProcessor , TokenedEvents }
28
- import com .snowplowanalytics .snowplow .snowflake .{Config , Environment , Metrics }
27
+ import com .snowplowanalytics .snowplow .snowflake .{Environment , Metrics }
28
+ import com .snowplowanalytics .snowplow .runtime .syntax .foldable ._
29
+ import com .snowplowanalytics .snowplow .runtime .processing .BatchUp
29
30
import com .snowplowanalytics .snowplow .loaders .transform .Transform
30
31
31
32
object Processing {
@@ -69,7 +70,7 @@ object Processing {
69
70
origBatchBytes : Long ,
70
71
badAccumulated : List [BadRow ],
71
72
countInserted : Int ,
72
- tokens : List [Unique .Token ]
73
+ tokens : Vector [Unique .Token ]
73
74
)
74
75
75
76
/**
@@ -110,7 +111,7 @@ object Processing {
110
111
in.through(setLatency(env.metrics))
111
112
.through(parseBytes(badProcessor))
112
113
.through(transform(badProcessor))
113
- .through(batchUp (env.batching))
114
+ .through(BatchUp .withTimeout (env.batching.maxBytes, env.batching.maxDelay ))
114
115
.through(writeToSnowflake(env, badProcessor))
115
116
.through(sendFailedEvents(env))
116
117
.through(sendMetrics(env))
@@ -132,58 +133,55 @@ object Processing {
132
133
}
133
134
134
135
/** Parse raw bytes into Event using analytics sdk */
135
- private def parseBytes [F [_]: Monad ](badProcessor : BadRowProcessor ): Pipe [F , TokenedEvents , ParsedBatch ] =
136
- _.evalMap { case TokenedEvents (list, token, _) =>
137
- Foldable [List ].foldM(list, ParsedBatch (Nil , Nil , 0L , token)) { case (acc, bytes) =>
138
- Applicative [F ].pure {
139
- val bytesSize = bytes.capacity
140
- val stringified = StandardCharsets .UTF_8 .decode(bytes).toString
141
- Event .parse(stringified).toEither match {
142
- case Right (e) =>
143
- acc.copy(events = e :: acc.events, countBytes = acc.countBytes + bytesSize)
144
- case Left (failure) =>
145
- val payload = BadRowRawPayload (stringified)
146
- val bad = BadRow .LoaderParsingError (badProcessor, failure, payload)
147
- acc.copy(bad = bad :: acc.bad, countBytes = acc.countBytes + bytesSize)
148
- }
149
- }
150
- }
136
+ private def parseBytes [F [_]: Sync ](badProcessor : BadRowProcessor ): Pipe [F , TokenedEvents , ParsedBatch ] =
137
+ _.evalMap { case TokenedEvents (chunk, token, _) =>
138
+ for {
139
+ numBytes <- Sync [F ].delay(Foldable [Chunk ].sumBytes(chunk))
140
+ (badRows, events) <- Foldable [Chunk ].traverseSeparateUnordered(chunk) { bytes =>
141
+ Sync [F ].delay {
142
+ val stringified = StandardCharsets .UTF_8 .decode(bytes).toString
143
+ Event .parse(stringified).toEither.leftMap { case failure =>
144
+ val payload = BadRowRawPayload (stringified)
145
+ BadRow .LoaderParsingError (badProcessor, failure, payload)
146
+ }
147
+ }
148
+ }
149
+ } yield ParsedBatch (events, badRows, numBytes, token)
151
150
}
152
151
153
152
/** Transform the Event into values compatible with the snowflake ingest sdk */
154
153
private def transform [F [_]: Sync ](badProcessor : BadRowProcessor ): Pipe [F , ParsedBatch , BatchAfterTransform ] =
155
- in =>
156
- for {
157
- ParsedBatch (events, bad, bytes, token) <- in
158
- loadTstamp <- Stream .eval(Sync [F ].realTimeInstant).map(SnowflakeCaster .timestampValue)
159
- result <- Stream .eval(transformBatch[F ](badProcessor, events, loadTstamp))
160
- (moreBad, transformed) = result.separate
161
- } yield BatchAfterTransform (
162
- toBeInserted = transformed.toVector,
163
- origBatchBytes = bytes,
164
- badAccumulated = bad ::: moreBad,
165
- countInserted = 0 ,
166
- tokens = List (token)
167
- )
154
+ _.evalMap { batch =>
155
+ Sync [F ].realTimeInstant.flatMap { now =>
156
+ val loadTstamp = SnowflakeCaster .timestampValue(now)
157
+ transformBatch[F ](badProcessor, loadTstamp, batch)
158
+ }
159
+ }
168
160
169
- private def transformBatch [F [_]: Monad ](
161
+ private def transformBatch [F [_]: Sync ](
170
162
badProcessor : BadRowProcessor ,
171
- events : List [ Event ] ,
172
- loadTstamp : OffsetDateTime
173
- ): F [List [ Either [ BadRow , ( Event , Map [ String , AnyRef ])]] ] =
174
- events
175
- .traverse { e =>
176
- Applicative [F ].pure {
163
+ loadTstamp : OffsetDateTime ,
164
+ batch : ParsedBatch
165
+ ): F [BatchAfterTransform ] =
166
+ Foldable [ List ]
167
+ .traverseSeparateUnordered(batch.events) { event =>
168
+ Sync [F ].delay {
177
169
Transform
178
- .transformEventUnstructured[AnyRef ](badProcessor, SnowflakeCaster , SnowflakeJsonFolder , e )
170
+ .transformEventUnstructured[AnyRef ](badProcessor, SnowflakeCaster , SnowflakeJsonFolder , event )
179
171
.map { namedValues =>
180
- val asMap = namedValues.map { case Caster .NamedValue (k, v) =>
181
- k -> v
182
- }.toMap
183
- (e, asMap + (" load_tstamp" -> loadTstamp))
172
+ val map = namedValues
173
+ .map { case Caster .NamedValue (k, v) =>
174
+ k -> v
175
+ }
176
+ .toMap
177
+ .updated(" load_tstamp" , loadTstamp)
178
+ event -> map
184
179
}
185
180
}
186
181
}
182
+ .map { case (badRows, eventsWithTransforms) =>
183
+ BatchAfterTransform (eventsWithTransforms.toVector, batch.countBytes, badRows ::: batch.bad, 0 , Vector (batch.token))
184
+ }
187
185
188
186
private def writeToSnowflake [F [_]: Async ](
189
187
env : Environment [F ],
@@ -342,57 +340,18 @@ object Processing {
342
340
343
341
private def fastGetByIndex [A ](items : Vector [A ], index : Long ): A = items(index.toInt)
344
342
345
- private implicit def batchedSemigroup : Semigroup [BatchAfterTransform ] = new Semigroup [BatchAfterTransform ] {
343
+ private implicit def batchable : BatchUp . Batchable [BatchAfterTransform ] = new BatchUp . Batchable [BatchAfterTransform ] {
346
344
def combine (x : BatchAfterTransform , y : BatchAfterTransform ): BatchAfterTransform =
347
345
BatchAfterTransform (
348
346
toBeInserted = x.toBeInserted ++ y.toBeInserted,
349
347
origBatchBytes = x.origBatchBytes + y.origBatchBytes,
350
348
badAccumulated = x.badAccumulated ::: y.badAccumulated,
351
349
countInserted = x.countInserted + y.countInserted,
352
- tokens = x.tokens ::: y.tokens
350
+ tokens = x.tokens ++ y.tokens
353
351
)
354
- }
355
352
356
- private def batchUp [F [_]: Async ](config : Config .Batching ): Pipe [F , BatchAfterTransform , BatchAfterTransform ] = {
357
- def go (
358
- timedPull : Pull .Timed [F , BatchAfterTransform ],
359
- unflushed : Option [BatchAfterTransform ]
360
- ): Pull [F , BatchAfterTransform , Unit ] =
361
- timedPull.uncons.flatMap {
362
- case None => // Upstream stream has finished cleanly
363
- unflushed match {
364
- case None => Pull .done
365
- case Some (b) => Pull .output1(b) *> Pull .done
366
- }
367
- case Some ((Left (_), next)) => // The timer we set has timed out.
368
- unflushed match {
369
- case None => go(next, None )
370
- case Some (b) => Pull .output1(b) >> go(next, None )
371
- }
372
- case Some ((Right (pulled), next)) if pulled.isEmpty =>
373
- go(next, unflushed)
374
- case Some ((Right (nonEmptyChunk), next)) => // Received another batch before the timer timed out
375
- val combined = unflushed match {
376
- case None => nonEmptyChunk.iterator.reduce(_ |+| _)
377
- case Some (b) => nonEmptyChunk.iterator.foldLeft(b)(_ |+| _)
378
- }
379
- if (combined.origBatchBytes > config.maxBytes)
380
- for {
381
- _ <- Pull .output1(combined)
382
- _ <- next.timeout(Duration .Zero )
383
- _ <- go(next, None )
384
- } yield ()
385
- else {
386
- for {
387
- _ <- if (unflushed.isEmpty) next.timeout(config.maxDelay) else Pull .pure(())
388
- _ <- go(next, Some (combined))
389
- } yield ()
390
- }
391
- }
392
- in =>
393
- in.pull.timed { timedPull =>
394
- go(timedPull, None )
395
- }.stream
353
+ def weightOf (a : BatchAfterTransform ): Long =
354
+ a.origBatchBytes
396
355
}
397
356
398
357
}
0 commit comments