diff --git a/config/config.azure.reference.hocon b/config/config.azure.reference.hocon index f59e8e9..0283fa2 100644 --- a/config/config.azure.reference.hocon +++ b/config/config.azure.reference.hocon @@ -70,7 +70,7 @@ "batching": { # - Events are emitted to Snowflake when the batch reaches this size in bytes - "maxBytes": 16000000 + "maxBytes": 64000000 # - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached "maxDelay": "1 second" diff --git a/config/config.kinesis.reference.hocon b/config/config.kinesis.reference.hocon index dad6b49..9ecfeee 100644 --- a/config/config.kinesis.reference.hocon +++ b/config/config.kinesis.reference.hocon @@ -87,7 +87,7 @@ "batching": { # - Events are emitted to Snowflake when the batch reaches this size in bytes - "maxBytes": 16000000 + "maxBytes": 64000000 # - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached "maxDelay": "1 second" diff --git a/config/config.pubsub.reference.hocon b/config/config.pubsub.reference.hocon index 21cab7f..077c58a 100644 --- a/config/config.pubsub.reference.hocon +++ b/config/config.pubsub.reference.hocon @@ -76,7 +76,7 @@ "batching": { # - Events are emitted to Snowflake when the batch reaches this size in bytes - "maxBytes": 16000000 + "maxBytes": 64000000 # - Events are emitted to Snowflake for a maximum of this duration, even if the `maxBytes` size has not been reached "maxDelay": "1 second" diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 8522fee..1d2ac8a 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -19,7 +19,7 @@ } "batching": { - "maxBytes": 16000000 + "maxBytes": 64000000 "maxDelay": "1 second" "uploadConcurrency": 3 } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala index 9922c16..c578866 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Channel.scala @@ -11,6 +11,7 @@ package com.snowplowanalytics.snowplow.snowflake.processing import cats.effect.{Async, Poll, Resource, Sync} +import cats.effect.std.Semaphore import cats.implicits._ import com.snowplowanalytics.snowplow.runtime.AppHealth import com.snowplowanalytics.snowplow.runtime.processing.Coldswap @@ -24,6 +25,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import java.time.ZoneOffset import java.util.Properties import scala.jdk.CollectionConverters._ +import scala.concurrent.duration.DurationLong trait Channel[F[_]] { @@ -35,7 +37,7 @@ trait Channel[F[_]] { * @return * List of the details of any insert failures. Empty list implies complete success. */ - def write(rows: Iterable[Map[String, AnyRef]]): F[Channel.WriteResult] + def write(rows: List[Iterable[Map[String, AnyRef]]]): F[Channel.WriteResult] } object Channel { @@ -62,7 +64,9 @@ object Channel { /** * The result of trying to enqueue an event for sending to Snowflake - * @param index + * @param outerIndex + * Refers to the batch number in the list of attempted batches + * @param innerIndex * Refers to the row number in the batch of attempted events * @param extraCols * The column names which were present in the batch but missing in the table @@ -71,7 +75,8 @@ object Channel { * enqueue */ case class WriteFailure( - index: Long, + outerIndex: Long, + innerIndex: Long, extraCols: List[String], cause: SFException ) @@ -108,8 +113,9 @@ object Channel { ): Resource[F, Opener[F]] = for { client <- createClient(config, batchingConfig, retriesConfig, appHealth) + semaphore <- Resource.eval(Semaphore[F](1L)) } yield new Opener[F] { - def open: F[CloseableChannel[F]] = createChannel[F](config, client).map(impl[F]) + def open: F[CloseableChannel[F]] = createChannel[F](config, client).map(impl[F](_, semaphore)) } def provider[F[_]: Async]( @@ -134,15 +140,32 @@ object Channel { Resource.makeFull(make)(_.close) } - private def impl[F[_]: Async](channel: SnowflakeStreamingIngestChannel): CloseableChannel[F] = + private def impl[F[_]: Async](channel: SnowflakeStreamingIngestChannel, semaphore: Semaphore[F]): CloseableChannel[F] = new CloseableChannel[F] { - def write(rows: Iterable[Map[String, AnyRef]]): F[WriteResult] = { - val attempt: F[WriteResult] = for { - response <- Sync[F].blocking(channel.insertRows(rows.map(_.asJava).asJava, null)) - _ <- flushChannel[F](channel) - isValid <- Sync[F].delay(channel.isValid) - } yield if (isValid) WriteResult.WriteFailures(parseResponse(response)) else WriteResult.ChannelIsInvalid + def write(rows: List[Iterable[Map[String, AnyRef]]]): F[WriteResult] = { + + val attempt: F[WriteResult] = semaphore.permit + .surround { + for { + responses <- rows.traverse { inner => + Sync[F].blocking(channel.insertRows(inner.map(_.asJava).asJava, null)) + } + future <- Sync[F].delay(SnowsFlakePlowInterop.flushChannel(channel)) + _ <- Sync[F].untilDefinedM { + for { + _ <- Sync[F].sleep(100.millis) + isEmpty <- Sync[F].delay(SnowsFlakePlowInterop.isEmpty(channel)) + } yield if (isEmpty) Some(()) else None + } + } yield (future, responses) + } + .flatMap { case (future, responses) => + for { + _ <- Async[F].fromCompletableFuture(Sync[F].pure(future)) + isValid <- Sync[F].delay(channel.isValid) + } yield if (isValid) WriteResult.WriteFailures(parseResponse(responses)) else WriteResult.ChannelIsInvalid + } attempt.recover { case sfe: SFException if sfe.getVendorCode === SFErrorCode.INVALID_CHANNEL.getMessageCode => @@ -166,14 +189,17 @@ object Channel { } } - private def parseResponse(response: InsertValidationResponse): List[WriteFailure] = - response.getInsertErrors.asScala.map { insertError => - WriteFailure( - insertError.getRowIndex, - Option(insertError.getExtraColNames).fold(List.empty[String])(_.asScala.toList), - insertError.getException - ) - }.toList + private def parseResponse(responses: List[InsertValidationResponse]): List[WriteFailure] = + responses.zipWithIndex.flatMap { case (response, outerIndex) => + response.getInsertErrors.asScala.map { insertError => + WriteFailure( + outerIndex.toLong, + insertError.getRowIndex, + Option(insertError.getExtraColNames).fold(List.empty[String])(_.asScala.toList), + insertError.getException + ) + } + } private def createChannel[F[_]: Async]( config: Config.Snowflake, @@ -238,15 +264,4 @@ object Channel { Resource.makeFull(make)(client => Sync[F].blocking(client.close())) } - /** - * Flushes the channel - * - * The public interface of the Snowflake SDK does not tell us when the events are safely written - * to Snowflake. So we must cast it to an Internal class so we get access to the `flush()` method. - */ - private def flushChannel[F[_]: Async](channel: SnowflakeStreamingIngestChannel): F[Unit] = - Async[F].fromCompletableFuture { - Async[F].delay(SnowsFlakePlowInterop.flushChannel(channel)) - }.void - } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala index 19b79c3..27af63c 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.snowflake/processing/Processing.scala @@ -22,6 +22,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger import java.nio.charset.StandardCharsets import java.time.OffsetDateTime +import scala.concurrent.duration.DurationLong import com.snowplowanalytics.iglu.schemaddl.parquet.Caster import com.snowplowanalytics.snowplow.analytics.scalasdk.Event @@ -74,7 +75,7 @@ object Processing { * The tokens to be emitted after we have finished processing all events */ private case class BatchAfterTransform( - toBeInserted: ListOfList[EventWithTransform], + toBeInserted: List[List[EventWithTransform]], origBatchBytes: Long, origBatchCount: Int, badAccumulated: ListOfList[BadRow], @@ -99,15 +100,15 @@ object Processing { ) private object ParsedWriteResult { - def empty: ParsedWriteResult = ParsedWriteResult(Set.empty, Nil, Nil) + private def empty: ParsedWriteResult = ParsedWriteResult(Set.empty, Nil, Nil) - def buildFrom(events: ListOfList[EventWithTransform], writeFailures: List[Channel.WriteFailure]): ParsedWriteResult = + def buildFrom(events: List[List[EventWithTransform]], writeFailures: List[Channel.WriteFailure]): ParsedWriteResult = if (writeFailures.isEmpty) empty else { - val indexed = events.copyToIndexedSeq + val indexed = events.map(_.toIndexedSeq).toIndexedSeq writeFailures.foldLeft(ParsedWriteResult.empty) { case (ParsedWriteResult(extraCols, eventsWithExtraCols, unexpected), failure) => - val event = fastGetByIndex(indexed, failure.index) + val event = fastGetByIndex(indexed, failure.outerIndex, failure.innerIndex) if (failure.extraCols.nonEmpty) ParsedWriteResult(extraCols ++ failure.extraCols, event :: eventsWithExtraCols, unexpected) else @@ -208,7 +209,7 @@ object Processing { Sync[F].untilDefinedM { env.channel.opened .use { channel => - channel.write(batch.toBeInserted.asIterable.map(_._2)) + channel.write(batch.toBeInserted.map(_.view.map(_._2))) } .flatMap { case Channel.WriteResult.ChannelIsInvalid => @@ -246,7 +247,7 @@ object Processing { badRowFromEnqueueFailure(badProcessor, event, sfe) } batch.copy( - toBeInserted = ListOfList.ofLists(parsedResult.eventsWithExtraCols), + toBeInserted = parsedResult.eventsWithExtraCols.grouped(100).toList, badAccumulated = batch.badAccumulated.prepend(moreBad) ) } @@ -267,15 +268,15 @@ object Processing { val mapped = notWritten match { case Nil => Nil case more => - val indexed = batch.toBeInserted.copyToIndexedSeq - more.map(f => (fastGetByIndex(indexed, f.index)._1, f.cause)) + val indexed = batch.toBeInserted.map(_.toIndexedSeq).toIndexedSeq + more.map(f => (fastGetByIndex(indexed, f.outerIndex, f.innerIndex)._1, f.cause)) } abortIfFatalException[F](mapped).as { val moreBad = mapped.map { case (event, sfe) => badRowFromEnqueueFailure(badProcessor, event, sfe) } batch.copy( - toBeInserted = ListOfList.empty, + toBeInserted = Nil, badAccumulated = batch.badAccumulated.prepend(moreBad) ) } @@ -357,18 +358,36 @@ object Processing { env.metrics.addGood(batch.origBatchCount - countBad) *> env.metrics.addBad(countBad) } - private def emitTokens[F[_]]: Pipe[F, BatchAfterTransform, Unique.Token] = - _.flatMap { batch => - Stream.emits(batch.tokens) + private implicit def batchableTokens: BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] = + new BatchUp.Batchable[BatchAfterTransform, Vector[Unique.Token]] { + def combine(b: Vector[Unique.Token], a: BatchAfterTransform): Vector[Unique.Token] = + b ++ a.tokens + + def single(a: BatchAfterTransform): Vector[Unique.Token] = + a.tokens + + def weightOf(a: BatchAfterTransform): Long = + 0L + } + + private def emitTokens[F[_]: Async]: Pipe[F, BatchAfterTransform, Unique.Token] = + BatchUp.withTimeout[F, BatchAfterTransform, Vector[Unique.Token]](Long.MaxValue, 10.seconds).andThen { + _.flatMap { tokens => + Stream.emits(tokens) + } } - private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt) + private def fastGetByIndex[A]( + items: IndexedSeq[IndexedSeq[A]], + outerIndex: Long, + innerIndex: Long + ): A = items(outerIndex.toInt)(innerIndex.toInt) private implicit def batchable: BatchUp.Batchable[TransformedBatch, BatchAfterTransform] = new BatchUp.Batchable[TransformedBatch, BatchAfterTransform] { def combine(b: BatchAfterTransform, a: TransformedBatch): BatchAfterTransform = BatchAfterTransform( - toBeInserted = b.toBeInserted.prepend(a.events), + toBeInserted = if (a.events.isEmpty) b.toBeInserted else a.events :: b.toBeInserted, origBatchBytes = b.origBatchBytes + a.countBytes, origBatchCount = b.origBatchCount + a.countItems, badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures), @@ -377,7 +396,7 @@ object Processing { def single(a: TransformedBatch): BatchAfterTransform = BatchAfterTransform( - ListOfList.of(List(a.events)), + if (a.events.isEmpty) Nil else List(a.events), a.countBytes, a.countItems, ListOfList.ofLists(a.parseFailures, a.transformFailures), diff --git a/modules/core/src/main/scala/net.snowflake.ingest.streaming.internal/SnowsFlakePlowInterop.scala b/modules/core/src/main/scala/net.snowflake.ingest.streaming.internal/SnowsFlakePlowInterop.scala index 396b13c..9edba4b 100644 --- a/modules/core/src/main/scala/net.snowflake.ingest.streaming.internal/SnowsFlakePlowInterop.scala +++ b/modules/core/src/main/scala/net.snowflake.ingest.streaming.internal/SnowsFlakePlowInterop.scala @@ -10,6 +10,7 @@ package net.snowflake.ingest.streaming.internal +import cats.implicits._ import net.snowflake.ingest.streaming.SnowflakeStreamingIngestChannel import net.snowflake.ingest.streaming.internal.SnowflakeStreamingIngestChannelInternal @@ -26,4 +27,7 @@ object SnowsFlakePlowInterop { def flushChannel(channel: SnowflakeStreamingIngestChannel): CompletableFuture[Void] = channel.asInstanceOf[SnowflakeStreamingIngestChannelInternal[_]].flush(false) + def isEmpty(channel: SnowflakeStreamingIngestChannel): Boolean = + channel.asInstanceOf[SnowflakeStreamingIngestChannelInternal[_]].getRowBuffer.getSize === 0.0f + } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala index 3372866..32a80f9 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/MockEnvironment.scala @@ -60,7 +60,7 @@ object MockEnvironment { metrics = testMetrics(state), appHealth = testAppHealth(state), batching = Config.Batching( - maxBytes = 16000000, + maxBytes = 64000000, maxDelay = 10.seconds, uploadConcurrency = 1 ), @@ -139,7 +139,7 @@ object MockEnvironment { Ref[IO].of(mockedResponses).map { responses => val make = actionRef.update(_ :+ OpenedChannel).as { new Channel[IO] { - def write(rows: Iterable[Map[String, AnyRef]]): IO[Channel.WriteResult] = + def write(rows: List[Iterable[Map[String, AnyRef]]]): IO[Channel.WriteResult] = for { response <- responses.modify { case head :: tail => (tail, head) @@ -155,12 +155,12 @@ object MockEnvironment { def updateActions( state: Ref[IO, Vector[Action]], - rows: Iterable[Map[String, AnyRef]], + rows: List[Iterable[Map[String, AnyRef]]], success: Response.Success[Channel.WriteResult] ): IO[Unit] = success.value match { case Channel.WriteResult.WriteFailures(failures) => - state.update(_ :+ WroteRowsToSnowflake(rows.size - failures.size)) + state.update(_ :+ WroteRowsToSnowflake(rows.flatten.size - failures.size)) case Channel.WriteResult.ChannelIsInvalid => IO.unit } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala index f6bd3ca..7ecf940 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ChannelProviderSpec.scala @@ -311,7 +311,7 @@ object ChannelProviderSpec { } private def testCloseableChannel(state: Ref[IO, Vector[Action]]): Channel.CloseableChannel[IO] = new Channel.CloseableChannel[IO] { - def write(rows: Iterable[Map[String, AnyRef]]): IO[Channel.WriteResult] = IO.pure(Channel.WriteResult.WriteFailures(Nil)) + def write(rows: List[Iterable[Map[String, AnyRef]]]): IO[Channel.WriteResult] = IO.pure(Channel.WriteResult.WriteFailures(Nil)) def close: IO[Unit] = state.update(_ :+ Action.ClosedChannel) } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala index 8917365..f1932ba 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.snowflake/processing/ProcessingSpec.scala @@ -109,7 +109,7 @@ class ProcessingSpec extends Specification with CatsEffect { Response.Success( Channel.WriteResult.WriteFailures( List( - Channel.WriteFailure(0L, List("unstruct_event_xyz_1", "contexts_abc_2"), newSFException(ErrorCode.INVALID_FORMAT_ROW)) + Channel.WriteFailure(0L, 0L, List("unstruct_event_xyz_1", "contexts_abc_2"), newSFException(ErrorCode.INVALID_FORMAT_ROW)) ) ) ), @@ -144,7 +144,7 @@ class ProcessingSpec extends Specification with CatsEffect { Response.Success( Channel.WriteResult.WriteFailures( List( - Channel.WriteFailure(0L, Nil, newSFException(ErrorCode.INVALID_FORMAT_ROW)) + Channel.WriteFailure(0L, 0L, Nil, newSFException(ErrorCode.INVALID_FORMAT_ROW)) ) ) ), @@ -176,7 +176,7 @@ class ProcessingSpec extends Specification with CatsEffect { Response.Success( Channel.WriteResult.WriteFailures( List( - Channel.WriteFailure(0L, Nil, newSFException(ErrorCode.INTERNAL_ERROR)) + Channel.WriteFailure(0L, 0L, Nil, newSFException(ErrorCode.INTERNAL_ERROR)) ) ) ), @@ -303,7 +303,7 @@ class ProcessingSpec extends Specification with CatsEffect { Response.Success( Channel.WriteResult.WriteFailures( List( - Channel.WriteFailure(0L, List.empty, newSFException(ErrorCode.INTERNAL_ERROR)) + Channel.WriteFailure(0L, 0L, List.empty, newSFException(ErrorCode.INTERNAL_ERROR)) ) ) ) diff --git a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KafkaConfigSpec.scala b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KafkaConfigSpec.scala index ec2e0fc..a1f8567 100644 --- a/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KafkaConfigSpec.scala +++ b/modules/kafka/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KafkaConfigSpec.scala @@ -105,7 +105,7 @@ object KafkaConfigSpec { ) ), batching = Config.Batching( - maxBytes = 16000000, + maxBytes = 64000000, maxDelay = 1.second, uploadConcurrency = 3 ), @@ -186,7 +186,7 @@ object KafkaConfigSpec { ) ), batching = Config.Batching( - maxBytes = 16000000, + maxBytes = 64000000, maxDelay = 1.second, uploadConcurrency = 1 ), diff --git a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala index eba4086..e02a9ab 100644 --- a/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala +++ b/modules/kinesis/src/test/scala/com/snowplowanalytics/snowplow/snowflake/KinesisConfigSpec.scala @@ -101,7 +101,7 @@ object KinesisConfigSpec { ) ), batching = Config.Batching( - maxBytes = 16000000, + maxBytes = 64000000, maxDelay = 1.second, uploadConcurrency = 3 ), @@ -177,7 +177,7 @@ object KinesisConfigSpec { ) ), batching = Config.Batching( - maxBytes = 16000000, + maxBytes = 64000000, maxDelay = 1.second, uploadConcurrency = 1 ), diff --git a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala index dd0ec84..31b05b4 100644 --- a/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala +++ b/modules/pubsub/src/test/scala/com/snowplowanalytics/snowplow/snowflake/PubsubConfigSpec.scala @@ -101,7 +101,7 @@ object PubsubConfigSpec { ) ), batching = Config.Batching( - maxBytes = 16000000, + maxBytes = 64000000, maxDelay = 1.second, uploadConcurrency = 3 ), @@ -176,7 +176,7 @@ object PubsubConfigSpec { ) ), batching = Config.Batching( - maxBytes = 16000000, + maxBytes = 64000000, maxDelay = 1.second, uploadConcurrency = 1 ),