Skip to content

Commit 8f31f53

Browse files
committed
Amended for common-streams 0.10.0-M3
1 parent 5ca2239 commit 8f31f53

File tree

6 files changed

+15
-73
lines changed

6 files changed

+15
-73
lines changed

modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala

+5-20
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import com.snowplowanalytics.snowplow.badrows.{BadRow, Processor => BadRowProces
3333
import com.snowplowanalytics.snowplow.badrows.Payload.{RawPayload => BadRowRawPayload}
3434
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, TokenedEvents}
3535
import com.snowplowanalytics.snowplow.sinks.ListOfList
36-
import com.snowplowanalytics.snowplow.lakes.{Environment, Metrics, RuntimeService}
36+
import com.snowplowanalytics.snowplow.lakes.{Environment, RuntimeService}
3737
import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
3838
import com.snowplowanalytics.snowplow.runtime.syntax.foldable._
3939
import com.snowplowanalytics.snowplow.loaders.transform.{
@@ -56,8 +56,8 @@ object Processing {
5656
// Needed for Kinesis, where we want to subscribe to the stream as early as possible, so that other workers don't steal our shard leases
5757
Stream.eval(env.lakeWriter.createTable *> deferredTableExists.complete(()))
5858

59-
implicit val lookup: RegistryLookup[F] = Http4sRegistryLookup(env.httpClient)
60-
val eventProcessingConfig: EventProcessingConfig = EventProcessingConfig(env.windowing)
59+
implicit val lookup: RegistryLookup[F] = Http4sRegistryLookup(env.httpClient)
60+
val eventProcessingConfig = EventProcessingConfig(env.windowing, env.metrics.setLatency)
6161

6262
env.source
6363
.stream(eventProcessingConfig, eventProcessor(env, deferredTableExists.get))
@@ -120,8 +120,7 @@ object Processing {
120120
badProcessor: BadRowProcessor,
121121
ref: Ref[F, WindowState]
122122
): Pipe[F, TokenedEvents, Nothing] =
123-
_.through(setLatency(env.metrics))
124-
.through(rememberTokens(ref))
123+
_.through(rememberTokens(ref))
125124
.through(incrementReceivedCount(env))
126125
.through(parseBytes(env, badProcessor))
127126
.through(handleParseFailures(env, badProcessor))
@@ -167,22 +166,8 @@ object Processing {
167166

168167
}.drain
169168

170-
private def setLatency[F[_]: Sync](metrics: Metrics[F]): Pipe[F, TokenedEvents, TokenedEvents] =
171-
_.evalTap {
172-
_.earliestSourceTstamp match {
173-
case Some(t) =>
174-
for {
175-
now <- Sync[F].realTime
176-
latency = now - t.toEpochMilli.millis
177-
_ <- metrics.setLatency(latency)
178-
} yield ()
179-
case None =>
180-
Applicative[F].unit
181-
}
182-
}
183-
184169
private def rememberTokens[F[_]: Functor](ref: Ref[F, WindowState]): Pipe[F, TokenedEvents, Chunk[ByteBuffer]] =
185-
_.evalMap { case TokenedEvents(events, token, _) =>
170+
_.evalMap { case TokenedEvents(events, token) =>
186171
ref.update(state => state.copy(tokens = token :: state.tokens)).as(events)
187172
}
188173

modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/MockEnvironment.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ object MockEnvironment {
119119

120120
private def testSourceAndAck(windows: List[List[TokenedEvents]], state: Ref[IO, Vector[Action]]): SourceAndAck[IO] =
121121
new SourceAndAck[IO] {
122-
def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): Stream[IO, Nothing] =
122+
def stream(config: EventProcessingConfig[IO], processor: EventProcessor[IO]): Stream[IO, Nothing] =
123123
Stream.eval(state.update(_ :+ SubscribedToStream)).drain ++
124124
Stream.emits(windows).flatMap { batches =>
125125
Stream

modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/TestSparkEnvironment.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ object TestSparkEnvironment {
6060

6161
private def testSourceAndAck(windows: List[List[TokenedEvents]]): SourceAndAck[IO] =
6262
new SourceAndAck[IO] {
63-
def stream(config: EventProcessingConfig, processor: EventProcessor[IO]): Stream[IO, Nothing] =
63+
def stream(config: EventProcessingConfig[IO], processor: EventProcessor[IO]): Stream[IO, Nothing] =
6464
Stream.emits(windows).flatMap { batches =>
6565
Stream
6666
.emits(batches)

modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/EventUtils.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ object EventUtils {
2828
StandardCharsets.UTF_8.encode(e.toTsv)
2929
}
3030
IO.unique.map { ack =>
31-
TokenedEvents(serialized, ack, None)
31+
TokenedEvents(serialized, ack)
3232
}
3333
}
3434
}
@@ -60,7 +60,7 @@ object EventUtils {
6060
def badlyFormatted: IO[TokenedEvents] =
6161
IO.unique.map { token =>
6262
val serialized = Chunk("nonsense1", "nonsense2").map(s => ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
63-
TokenedEvents(serialized, token, None)
63+
TokenedEvents(serialized, token)
6464
}
6565

6666
}

modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSpec.scala

+5-48
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,11 @@
1111
package com.snowplowanalytics.snowplow.lakes.processing
1212

1313
import cats.implicits._
14-
import cats.effect.IO
1514
import org.specs2.Specification
1615
import cats.effect.testing.specs2.CatsEffect
1716
import io.circe.Json
1817
import cats.effect.testkit.TestControl
1918

20-
import java.time.Instant
21-
import scala.concurrent.duration.DurationLong
22-
2319
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
2420
import com.snowplowanalytics.snowplow.analytics.scalasdk.SnowplowEvent
2521
import com.snowplowanalytics.snowplow.lakes.{MockEnvironment, RuntimeService}
@@ -34,10 +30,9 @@ class ProcessingSpec extends Specification with CatsEffect {
3430
Write multiple windows of events in order $e3
3531
Write multiple batches in a single window when batch exceeds cutoff $e4
3632
Write good batches and bad events when a window contains both $e5
37-
Set the latency metric based off the message timestamp $e6
38-
Load events with a known schema $e7
39-
Send failed events for an unrecognized schema $e8
40-
Crash and exit for an unrecognized schema, if exitOnMissingIgluSchema is true $e9
33+
Load events with a known schema $e6
34+
Send failed events for an unrecognized schema $e7
35+
Crash and exit for an unrecognized schema, if exitOnMissingIgluSchema is true $e8
4136
"""
4237

4338
def e1 = {
@@ -227,44 +222,6 @@ class ProcessingSpec extends Specification with CatsEffect {
227222
}
228223

229224
def e6 = {
230-
val messageTime = Instant.parse("2023-10-24T10:00:00.000Z")
231-
val processTime = Instant.parse("2023-10-24T10:00:42.123Z").minusMillis(MockEnvironment.TimeTakenToCreateTable.toMillis)
232-
233-
val io = for {
234-
inputs <- EventUtils.inputEvents(2, EventUtils.good())
235-
tokened <- inputs.traverse(_.tokened).map {
236-
_.map {
237-
_.copy(earliestSourceTstamp = Some(messageTime))
238-
}
239-
}
240-
control <- MockEnvironment.build(List(tokened))
241-
_ <- IO.sleep(processTime.toEpochMilli.millis)
242-
_ <- Processing.stream(control.environment).compile.drain
243-
state <- control.state.get
244-
} yield state should beEqualTo(
245-
Vector(
246-
Action.SubscribedToStream,
247-
Action.CreatedTable,
248-
Action.InitializedLocalDataFrame("v20231024100032"),
249-
Action.SetLatencyMetric(42123.millis),
250-
Action.AddedReceivedCountMetric(2),
251-
Action.SetLatencyMetric(42123.millis),
252-
Action.AddedReceivedCountMetric(2),
253-
Action.AppendedRowsToDataFrame("v20231024100032", 4),
254-
Action.CommittedToTheLake("v20231024100032"),
255-
Action.AddedCommittedCountMetric(4),
256-
Action.SetProcessingLatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable),
257-
Action
258-
.SetE2ELatencyMetric(MockEnvironment.WindowDuration + MockEnvironment.TimeTakenToCreateTable + processTime.toEpochMilli.millis),
259-
Action.Checkpointed(tokened.map(_.ack)),
260-
Action.RemovedDataFrameFromDisk("v20231024100032")
261-
)
262-
)
263-
264-
TestControl.executeEmbed(io)
265-
}
266-
267-
def e7 = {
268225

269226
val ueGood700 = SnowplowEvent.UnstructEvent(
270227
Some(
@@ -302,7 +259,7 @@ class ProcessingSpec extends Specification with CatsEffect {
302259
TestControl.executeEmbed(io)
303260
}
304261

305-
def e8 = {
262+
def e7 = {
306263

307264
val ueDoesNotExist = SnowplowEvent.UnstructEvent(
308265
Some(
@@ -342,7 +299,7 @@ class ProcessingSpec extends Specification with CatsEffect {
342299
TestControl.executeEmbed(io)
343300
}
344301

345-
def e9 = {
302+
def e8 = {
346303

347304
val ueDoesNotExist = SnowplowEvent.UnstructEvent(
348305
Some(

project/Dependencies.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ object Dependencies {
4949
val awsRegistry = "1.1.20"
5050

5151
// Snowplow
52-
val streams = "0.10.0-M2"
52+
val streams = "0.10.0-M3"
5353
val igluClient = "4.0.0"
5454

5555
// Transitive overrides

0 commit comments

Comments
 (0)