diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala index bcf7af1..46fff90 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProvider.scala @@ -57,7 +57,7 @@ trait ChannelProvider[F[_]] { * @return * List of the details of any insert failures. Empty list implies complete success. */ - def write(rows: Seq[Map[String, AnyRef]]): F[ChannelProvider.WriteResult] + def write(rows: Iterable[Map[String, AnyRef]]): F[ChannelProvider.WriteResult] } object ChannelProvider { @@ -109,7 +109,7 @@ object ChannelProvider { def make[F[_]: Async](config: Config.Snowflake, batchingConfig: Config.Batching): Resource[F, ChannelProvider[F]] = for { - client <- createClient(config, batchingConfig: Config.Batching) + client <- createClient(config, batchingConfig) hs <- Hotswap.create[F, SnowflakeStreamingIngestChannel] channel <- Resource.eval(hs.swap(createChannel(config, client))) ref <- Resource.eval(Ref[F].of(channel)) @@ -153,7 +153,7 @@ object ChannelProvider { } } - def write(rows: Seq[Map[String, AnyRef]]): F[WriteResult] = + def write(rows: Iterable[Map[String, AnyRef]]): F[WriteResult] = sem.permit .use[WriteResult] { _ => for { diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index 6e5dfc0..c1caebe 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -24,6 +24,7 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor} import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload} import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents} +import com.snowplowanalytics.snowplow.sinks.ListOfList import com.snowplowanalytics.snowplow.snowflake.{Environment, Metrics} import com.snowplowanalytics.snowplow.runtime.syntax.foldable._ import com.snowplowanalytics.snowplow.runtime.processing.BatchUp @@ -42,8 +43,18 @@ object Processing { private case class ParsedBatch( events: List[Event], - bad: List[BadRow], + parseFailures: List[BadRow], countBytes: Long, + countItems: Int, + token: Unique.Token + ) + + private case class TransformedBatch( + events: List[EventWithTransform], + parseFailures: List[BadRow], + transformFailures: List[BadRow], + countBytes: Long, + countItems: Int, token: Unique.Token ) @@ -56,20 +67,20 @@ object Processing { * Events from this batch which have not yet been inserted. Events are dropped from this list * once they have either failed or got inserted. Implemented as a Vector because we need to do * lookup by index. - * @param origBatchSize - * The count of events in the original batch. Includes all good and bad events. * @param origBatchBytes * The total size in bytes of events in the original batch. Includes all good and bad events. + * @param origBatchCount + * The count of events in the original batch. Includes all good and bad events. * @param badAccumulated * Events that failed for any reason so far. * @param tokens * The tokens to be emitted after we have finished processing all events */ private case class BatchAfterTransform( - toBeInserted: Vector[EventWithTransform], + toBeInserted: ListOfList[EventWithTransform], origBatchBytes: Long, - badAccumulated: List[BadRow], - countInserted: Int, + origBatchCount: Int, + badAccumulated: ListOfList[BadRow], tokens: Vector[Unique.Token] ) @@ -86,20 +97,25 @@ object Processing { */ private case class ParsedWriteResult( extraColsRequired: Set[String], - eventsWithExtraCols: Vector[EventWithTransform], + eventsWithExtraCols: List[EventWithTransform], unexpectedFailures: List[(Event, SFException)] ) private object ParsedWriteResult { - def empty: ParsedWriteResult = ParsedWriteResult(Set.empty, Vector.empty, Nil) - - def buildFrom(events: Vector[EventWithTransform], results: List[ChannelProvider.WriteFailure]): ParsedWriteResult = - results.foldLeft(ParsedWriteResult.empty) { case (ParsedWriteResult(extraCols, eventsWithExtraCols, unexpected), failure) => - val event = fastGetByIndex(events, failure.index) - if (failure.extraCols.nonEmpty) - ParsedWriteResult(extraCols ++ failure.extraCols, eventsWithExtraCols :+ event, unexpected) - else - ParsedWriteResult(extraCols, eventsWithExtraCols, (event._1, failure.cause) :: unexpected) + def empty: ParsedWriteResult = ParsedWriteResult(Set.empty, Nil, Nil) + + def buildFrom(events: ListOfList[EventWithTransform], writeFailures: List[ChannelProvider.WriteFailure]): ParsedWriteResult = + if (writeFailures.isEmpty) + empty + else { + val indexed = events.toIndexedSeq + writeFailures.foldLeft(ParsedWriteResult.empty) { case (ParsedWriteResult(extraCols, eventsWithExtraCols, unexpected), failure) => + val event = fastGetByIndex(indexed, failure.index) + if (failure.extraCols.nonEmpty) + ParsedWriteResult(extraCols ++ failure.extraCols, event :: eventsWithExtraCols, unexpected) + else + ParsedWriteResult(extraCols, eventsWithExtraCols, (event._1, failure.cause) :: unexpected) + } } } @@ -146,11 +162,11 @@ object Processing { } } } - } yield ParsedBatch(events, badRows, numBytes, token) + } yield ParsedBatch(events, badRows, numBytes, chunk.size, token) } /** Transform the Event into values compatible with the snowflake ingest sdk */ - private def transform[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, ParsedBatch, BatchAfterTransform] = + private def transform[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, ParsedBatch, TransformedBatch] = _.evalMap { batch => Sync[F].realTimeInstant.flatMap { now => val loadTstamp = SnowflakeCaster.timestampValue(now) @@ -162,7 +178,7 @@ object Processing { badProcessor: BadRowProcessor, loadTstamp: OffsetDateTime, batch: ParsedBatch - ): F[BatchAfterTransform] = + ): F[TransformedBatch] = Foldable[List] .traverseSeparateUnordered(batch.events) { event => Sync[F].delay { @@ -179,8 +195,8 @@ object Processing { } } } - .map { case (badRows, eventsWithTransforms) => - BatchAfterTransform(eventsWithTransforms.toVector, batch.countBytes, badRows ::: batch.bad, 0, Vector(batch.token)) + .map { case (transformFailures, eventsWithTransforms) => + TransformedBatch(eventsWithTransforms, batch.parseFailures, transformFailures, batch.countBytes, batch.countItems, batch.token) } private def writeToSnowflake[F[_]: Async]( @@ -204,7 +220,7 @@ object Processing { batch.pure[F] else Sync[F].untilDefinedM { - env.channelProvider.write(batch.toBeInserted.map(_._2)).flatMap { + env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2)).flatMap { case ChannelProvider.WriteResult.ChannelIsInvalid => // Reset the channel and immediately try again env.channelProvider.reset.as(none) @@ -234,9 +250,8 @@ object Processing { badRowFromEnqueueFailure(badProcessor, event, sfe) } batch.copy( - toBeInserted = parsedResult.eventsWithExtraCols, - badAccumulated = moreBad ::: batch.badAccumulated, - countInserted = batch.toBeInserted.size - notWritten.size + toBeInserted = ListOfList.ofLists(parsedResult.eventsWithExtraCols), + badAccumulated = batch.badAccumulated.prepend(moreBad) ) } } @@ -253,15 +268,19 @@ object Processing { batch: BatchAfterTransform ): F[BatchAfterTransform] = withWriteAttempt(env, batch) { notWritten => - val mapped = notWritten.map(f => (fastGetByIndex(batch.toBeInserted, f.index)._1, f.cause)) + val mapped = notWritten match { + case Nil => Nil + case more => + val indexed = batch.toBeInserted.toIndexedSeq + more.map(f => (fastGetByIndex(indexed, f.index)._1, f.cause)) + } abortIfFatalException[F](mapped).as { val moreBad = mapped.map { case (event, sfe) => badRowFromEnqueueFailure(badProcessor, event, sfe) } batch.copy( - toBeInserted = Vector.empty, - badAccumulated = moreBad ::: batch.badAccumulated, - countInserted = batch.countInserted + batch.toBeInserted.size - notWritten.size + toBeInserted = ListOfList.empty, + badAccumulated = batch.badAccumulated.prepend(moreBad) ) } } @@ -323,14 +342,15 @@ object Processing { private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] = _.evalTap { batch => if (batch.badAccumulated.nonEmpty) { - val serialized = batch.badAccumulated.map(_.compact.getBytes(StandardCharsets.UTF_8)) + val serialized = batch.badAccumulated.mapUnordered(_.compactByteArray) env.badSink.sinkSimple(serialized) } else Applicative[F].unit } private def sendMetrics[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] = _.evalTap { batch => - env.metrics.addGood(batch.countInserted) *> env.metrics.addBad(batch.badAccumulated.size) + val countBad = batch.badAccumulated.asIterable.size + env.metrics.addGood(batch.origBatchCount - countBad) *> env.metrics.addBad(countBad) } private def emitTokens[F[_]]: Pipe[F, BatchAfterTransform, Unique.Token] = @@ -338,20 +358,30 @@ object Processing { Stream.emits(batch.tokens) } - private def fastGetByIndex[A](items: Vector[A], index: Long): A = items(index.toInt) + private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt) + + private implicit def batchable: BatchUp.Batchable[TransformedBatch, BatchAfterTransform] = + new BatchUp.Batchable[TransformedBatch, BatchAfterTransform] { + def combine(b: BatchAfterTransform, a: TransformedBatch): BatchAfterTransform = + BatchAfterTransform( + toBeInserted = b.toBeInserted.prepend(a.events), + origBatchBytes = b.origBatchBytes + a.countBytes, + origBatchCount = b.origBatchCount + a.countItems, + badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures), + tokens = b.tokens :+ a.token + ) - private implicit def batchable: BatchUp.Batchable[BatchAfterTransform] = new BatchUp.Batchable[BatchAfterTransform] { - def combine(x: BatchAfterTransform, y: BatchAfterTransform): BatchAfterTransform = - BatchAfterTransform( - toBeInserted = x.toBeInserted ++ y.toBeInserted, - origBatchBytes = x.origBatchBytes + y.origBatchBytes, - badAccumulated = x.badAccumulated ::: y.badAccumulated, - countInserted = x.countInserted + y.countInserted, - tokens = x.tokens ++ y.tokens - ) + def single(a: TransformedBatch): BatchAfterTransform = + BatchAfterTransform( + ListOfList.of(List(a.events)), + a.countBytes, + a.countItems, + ListOfList.ofLists(a.parseFailures, a.transformFailures), + Vector(a.token) + ) - def weightOf(a: BatchAfterTransform): Long = - a.origBatchBytes - } + def weightOf(a: TransformedBatch): Long = + a.countBytes + } } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index c5d773c..4cbe7c2 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -97,7 +97,7 @@ object MockEnvironment { } private def testSink(ref: Ref[IO, Vector[Action]]): Sink[IO] = Sink[IO] { batch => - ref.update(_ :+ SentToBad(batch.size)) + ref.update(_ :+ SentToBad(batch.asIterable.size)) } private def testHttpClient: Client[IO] = Client[IO] { _ => @@ -130,7 +130,7 @@ object MockEnvironment { _ <- actionRef.update(_ :+ OpenedChannel) } yield a - def write(rows: Seq[Map[String, AnyRef]]): IO[ChannelProvider.WriteResult] = + def write(rows: Iterable[Map[String, AnyRef]]): IO[ChannelProvider.WriteResult] = for { response <- responseRef.modify { case head :: tail => (tail, head) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b1ca478..e0bf7e4 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,7 +28,7 @@ object Dependencies { val awsSdk2 = "2.20.135" // Snowplow - val streams = "0.1.0" + val streams = "0.2.0-M1a" // tests val specs2 = "4.20.0"