Skip to content

Commit

Permalink
Update common libraries to 0.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Nov 22, 2023
1 parent 91ebf72 commit d255824
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ trait ChannelProvider[F[_]] {
* @return
* List of the details of any insert failures. Empty list implies complete success.
*/
def write(rows: Seq[Map[String, AnyRef]]): F[ChannelProvider.WriteResult]
def write(rows: Iterable[Map[String, AnyRef]]): F[ChannelProvider.WriteResult]
}

object ChannelProvider {
Expand Down Expand Up @@ -109,7 +109,7 @@ object ChannelProvider {

def make[F[_]: Async](config: Config.Snowflake, batchingConfig: Config.Batching): Resource[F, ChannelProvider[F]] =
for {
client <- createClient(config, batchingConfig: Config.Batching)
client <- createClient(config, batchingConfig)
hs <- Hotswap.create[F, SnowflakeStreamingIngestChannel]
channel <- Resource.eval(hs.swap(createChannel(config, client)))
ref <- Resource.eval(Ref[F].of(channel))
Expand Down Expand Up @@ -153,7 +153,7 @@ object ChannelProvider {
}
}

def write(rows: Seq[Map[String, AnyRef]]): F[WriteResult] =
def write(rows: Iterable[Map[String, AnyRef]]): F[WriteResult] =
sem.permit
.use[WriteResult] { _ =>
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload => BadPayload, Processor => BadRowProcessor}
import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload}
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents}
import com.snowplowanalytics.snowplow.sinks.ListOfList
import com.snowplowanalytics.snowplow.snowflake.{Environment, Metrics}
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
Expand All @@ -42,8 +43,18 @@ object Processing {

private case class ParsedBatch(
events: List[Event],
bad: List[BadRow],
parseFailures: List[BadRow],
countBytes: Long,
countItems: Int,
token: Unique.Token
)

private case class TransformedBatch(
events: List[EventWithTransform],
parseFailures: List[BadRow],
transformFailures: List[BadRow],
countBytes: Long,
countItems: Int,
token: Unique.Token
)

Expand All @@ -56,20 +67,20 @@ object Processing {
* Events from this batch which have not yet been inserted. Events are dropped from this list
* once they have either failed or got inserted. Implemented as a Vector because we need to do
* lookup by index.
* @param origBatchSize
* The count of events in the original batch. Includes all good and bad events.
* @param origBatchBytes
* The total size in bytes of events in the original batch. Includes all good and bad events.
* @param origBatchCount
* The count of events in the original batch. Includes all good and bad events.
* @param badAccumulated
* Events that failed for any reason so far.
* @param tokens
* The tokens to be emitted after we have finished processing all events
*/
private case class BatchAfterTransform(
toBeInserted: Vector[EventWithTransform],
toBeInserted: ListOfList[EventWithTransform],
origBatchBytes: Long,
badAccumulated: List[BadRow],
countInserted: Int,
origBatchCount: Int,
badAccumulated: ListOfList[BadRow],
tokens: Vector[Unique.Token]
)

Expand All @@ -86,20 +97,25 @@ object Processing {
*/
private case class ParsedWriteResult(
extraColsRequired: Set[String],
eventsWithExtraCols: Vector[EventWithTransform],
eventsWithExtraCols: List[EventWithTransform],
unexpectedFailures: List[(Event, SFException)]
)

private object ParsedWriteResult {
def empty: ParsedWriteResult = ParsedWriteResult(Set.empty, Vector.empty, Nil)

def buildFrom(events: Vector[EventWithTransform], results: List[ChannelProvider.WriteFailure]): ParsedWriteResult =
results.foldLeft(ParsedWriteResult.empty) { case (ParsedWriteResult(extraCols, eventsWithExtraCols, unexpected), failure) =>
val event = fastGetByIndex(events, failure.index)
if (failure.extraCols.nonEmpty)
ParsedWriteResult(extraCols ++ failure.extraCols, eventsWithExtraCols :+ event, unexpected)
else
ParsedWriteResult(extraCols, eventsWithExtraCols, (event._1, failure.cause) :: unexpected)
def empty: ParsedWriteResult = ParsedWriteResult(Set.empty, Nil, Nil)

def buildFrom(events: ListOfList[EventWithTransform], writeFailures: List[ChannelProvider.WriteFailure]): ParsedWriteResult =
if (writeFailures.isEmpty)
empty
else {
val indexed = events.asIterable.toIndexedSeq
writeFailures.foldLeft(ParsedWriteResult.empty) { case (ParsedWriteResult(extraCols, eventsWithExtraCols, unexpected), failure) =>
val event = fastGetByIndex(indexed, failure.index)
if (failure.extraCols.nonEmpty)
ParsedWriteResult(extraCols ++ failure.extraCols, event :: eventsWithExtraCols, unexpected)
else
ParsedWriteResult(extraCols, eventsWithExtraCols, (event._1, failure.cause) :: unexpected)
}
}
}

Expand Down Expand Up @@ -146,11 +162,11 @@ object Processing {
}
}
}
} yield ParsedBatch(events, badRows, numBytes, token)
} yield ParsedBatch(events, badRows, numBytes, chunk.size, token)
}

/** Transform the Event into values compatible with the snowflake ingest sdk */
private def transform[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, ParsedBatch, BatchAfterTransform] =
private def transform[F[_]: Sync](badProcessor: BadRowProcessor): Pipe[F, ParsedBatch, TransformedBatch] =
_.evalMap { batch =>
Sync[F].realTimeInstant.flatMap { now =>
val loadTstamp = SnowflakeCaster.timestampValue(now)
Expand All @@ -162,7 +178,7 @@ object Processing {
badProcessor: BadRowProcessor,
loadTstamp: OffsetDateTime,
batch: ParsedBatch
): F[BatchAfterTransform] =
): F[TransformedBatch] =
Foldable[List]
.traverseSeparateUnordered(batch.events) { event =>
Sync[F].delay {
Expand All @@ -179,8 +195,8 @@ object Processing {
}
}
}
.map { case (badRows, eventsWithTransforms) =>
BatchAfterTransform(eventsWithTransforms.toVector, batch.countBytes, badRows ::: batch.bad, 0, Vector(batch.token))
.map { case (transformFailures, eventsWithTransforms) =>
TransformedBatch(eventsWithTransforms, batch.parseFailures, transformFailures, batch.countBytes, batch.countItems, batch.token)
}

private def writeToSnowflake[F[_]: Async](
Expand All @@ -204,7 +220,7 @@ object Processing {
batch.pure[F]
else
Sync[F].untilDefinedM {
env.channelProvider.write(batch.toBeInserted.map(_._2)).flatMap {
env.channelProvider.write(batch.toBeInserted.asIterable.map(_._2)).flatMap {
case ChannelProvider.WriteResult.ChannelIsInvalid =>
// Reset the channel and immediately try again
env.channelProvider.reset.as(none)
Expand Down Expand Up @@ -234,9 +250,8 @@ object Processing {
badRowFromEnqueueFailure(badProcessor, event, sfe)
}
batch.copy(
toBeInserted = parsedResult.eventsWithExtraCols,
badAccumulated = moreBad ::: batch.badAccumulated,
countInserted = batch.toBeInserted.size - notWritten.size
toBeInserted = ListOfList.ofLists(parsedResult.eventsWithExtraCols),
badAccumulated = batch.badAccumulated.prepend(moreBad)
)
}
}
Expand All @@ -253,15 +268,19 @@ object Processing {
batch: BatchAfterTransform
): F[BatchAfterTransform] =
withWriteAttempt(env, batch) { notWritten =>
val mapped = notWritten.map(f => (fastGetByIndex(batch.toBeInserted, f.index)._1, f.cause))
val mapped = notWritten match {
case Nil => Nil
case more =>
val indexed = batch.toBeInserted.asIterable.toIndexedSeq
more.map(f => (fastGetByIndex(indexed, f.index)._1, f.cause))
}
abortIfFatalException[F](mapped).as {
val moreBad = mapped.map { case (event, sfe) =>
badRowFromEnqueueFailure(badProcessor, event, sfe)
}
batch.copy(
toBeInserted = Vector.empty,
badAccumulated = moreBad ::: batch.badAccumulated,
countInserted = batch.countInserted + batch.toBeInserted.size - notWritten.size
toBeInserted = ListOfList.empty,
badAccumulated = batch.badAccumulated.prepend(moreBad)
)
}
}
Expand Down Expand Up @@ -323,35 +342,46 @@ object Processing {
private def sendFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap { batch =>
if (batch.badAccumulated.nonEmpty) {
val serialized = batch.badAccumulated.map(_.compact.getBytes(StandardCharsets.UTF_8))
val serialized = batch.badAccumulated.mapUnordered(_.compactByteArray)
env.badSink.sinkSimple(serialized)
} else Applicative[F].unit
}

private def sendMetrics[F[_]: Applicative, A](env: Environment[F]): Pipe[F, BatchAfterTransform, BatchAfterTransform] =
_.evalTap { batch =>
env.metrics.addGood(batch.countInserted) *> env.metrics.addBad(batch.badAccumulated.size)
val countBad = batch.badAccumulated.asIterable.size
env.metrics.addGood(batch.origBatchCount - countBad) *> env.metrics.addBad(countBad)
}

private def emitTokens[F[_]]: Pipe[F, BatchAfterTransform, Unique.Token] =
_.flatMap { batch =>
Stream.emits(batch.tokens)
}

private def fastGetByIndex[A](items: Vector[A], index: Long): A = items(index.toInt)
private def fastGetByIndex[A](items: IndexedSeq[A], index: Long): A = items(index.toInt)

private implicit def batchable: BatchUp.Batchable[TransformedBatch, BatchAfterTransform] =
new BatchUp.Batchable[TransformedBatch, BatchAfterTransform] {
def combine(b: BatchAfterTransform, a: TransformedBatch): BatchAfterTransform =
BatchAfterTransform(
toBeInserted = b.toBeInserted.prepend(a.events),
origBatchBytes = b.origBatchBytes + a.countBytes,
origBatchCount = b.origBatchCount + a.countItems,
badAccumulated = b.badAccumulated.prepend(a.parseFailures).prepend(a.transformFailures),
tokens = b.tokens :+ a.token
)

private implicit def batchable: BatchUp.Batchable[BatchAfterTransform] = new BatchUp.Batchable[BatchAfterTransform] {
def combine(x: BatchAfterTransform, y: BatchAfterTransform): BatchAfterTransform =
BatchAfterTransform(
toBeInserted = x.toBeInserted ++ y.toBeInserted,
origBatchBytes = x.origBatchBytes + y.origBatchBytes,
badAccumulated = x.badAccumulated ::: y.badAccumulated,
countInserted = x.countInserted + y.countInserted,
tokens = x.tokens ++ y.tokens
)
def single(a: TransformedBatch): BatchAfterTransform =
BatchAfterTransform(
ListOfList.of(List(a.events)),
a.countBytes,
a.countItems,
ListOfList.ofLists(a.parseFailures, a.transformFailures),
Vector(a.token)
)

def weightOf(a: BatchAfterTransform): Long =
a.origBatchBytes
}
def weightOf(a: TransformedBatch): Long =
a.countBytes
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object MockEnvironment {
}

private def testSink(ref: Ref[IO, Vector[Action]]): Sink[IO] = Sink[IO] { batch =>
ref.update(_ :+ SentToBad(batch.size))
ref.update(_ :+ SentToBad(batch.asIterable.size))
}

private def testHttpClient: Client[IO] = Client[IO] { _ =>
Expand Down Expand Up @@ -130,7 +130,7 @@ object MockEnvironment {
_ <- actionRef.update(_ :+ OpenedChannel)
} yield a

def write(rows: Seq[Map[String, AnyRef]]): IO[ChannelProvider.WriteResult] =
def write(rows: Iterable[Map[String, AnyRef]]): IO[ChannelProvider.WriteResult] =
for {
response <- responseRef.modify {
case head :: tail => (tail, head)
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object Dependencies {
val awsSdk2 = "2.20.135"

// Snowplow
val streams = "0.1.0"
val streams = "0.2.0-M1a"

// tests
val specs2 = "4.20.0"
Expand Down

0 comments on commit d255824

Please sign in to comment.