Skip to content

Commit 9f624ed

Browse files
committed
Bump enrich to 5.4.0
This also upgrades micro to JDK 21, following the same upgrade of Enrich
1 parent 8f36067 commit 9f624ed

File tree

10 files changed

+65
-74
lines changed

10 files changed

+65
-74
lines changed

.github/workflows/deploy.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ jobs:
1515
with:
1616
submodules: 'true'
1717

18-
- name: Set up JDK 11
18+
- name: Set up JDK 21
1919
uses: actions/setup-java@v2
2020
with:
21-
java-version: 11
21+
java-version: 21
2222
distribution: adopt
2323

2424
- name: Set up Node 18

.github/workflows/test.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ jobs:
1010
steps:
1111
- uses: actions/checkout@v2
1212

13-
- name: Set up JDK 11
13+
- name: Set up JDK 21
1414
uses: actions/setup-java@v2
1515
with:
16-
java-version: 11
16+
java-version: 21
1717
distribution: adopt
1818

1919
- name: Install sbt

project/Dependencies.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ object Dependencies {
1717
object V {
1818
// Snowplow
1919
val snowplowStreamCollector = "3.4.0"
20-
val snowplowCommonEnrich = "5.3.0"
20+
val snowplowCommonEnrich = "5.4.0-rc8"
2121
val http4sCirce = "0.23.23"
2222

2323
val decline = "2.4.1"

project/plugins.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.4.2")
2-
addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.3.1")
2+
addSbtPlugin("com.snowplowanalytics" % "sbt-snowplow-release" % "0.4.0")
33
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.11.0")
44
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1")
55
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "1.0.0")

src/main/scala/com.snowplowanalytics.snowplow.micro/MemorySink.scala

Lines changed: 35 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import org.joda.time.DateTime
2929
import org.slf4j.LoggerFactory
3030
import com.snowplowanalytics.snowplow.micro.Configuration.EnrichConfig
3131

32+
import java.time.Instant
33+
3234
/** Sink of the collector that Snowplow Micro is.
3335
* Contains the functions that are called for each tracking event sent
3436
* to the collector endpoint.
@@ -74,47 +76,40 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
7476
* A `CollectorPayload` can contain several events.
7577
*/
7678
private[micro] def processThriftBytes(thriftBytes: Array[Byte]): IO[Unit] =
77-
ThriftLoader.toCollectorPayload(thriftBytes, processor) match {
78-
case Validated.Valid(maybePayload) =>
79-
maybePayload match {
80-
case Some(collectorPayload) =>
81-
adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup, enrichConfig.maxJsonDepth).flatMap {
82-
case Validated.Valid(rawEvents) =>
83-
val partitionEvents = rawEvents.toList.foldLeftM((Nil, Nil): (List[GoodEvent], List[BadEvent])) {
84-
case ((good, bad), rawEvent) =>
85-
validateEvent(rawEvent).value.map {
86-
case OptionIor.Right(goodEvent) =>
87-
logger.info(s"GOOD ${formatEvent(goodEvent)}")
88-
(goodEvent :: good, bad)
89-
case OptionIor.Both((errors, badRow), _) =>
90-
val badEvent = BadEvent(Some(collectorPayload), Some(rawEvent), errors)
91-
logger.warn(s"BAD ${formatBadRow(badRow)}")
92-
(good, badEvent :: bad)
93-
case OptionIor.Left((errors, badRow)) =>
94-
val badEvent = BadEvent(Some(collectorPayload), Some(rawEvent), errors)
95-
logger.warn(s"BAD ${formatBadRow(badRow)}")
96-
(good, badEvent :: bad)
97-
case OptionIor.None =>
98-
(good, bad)
99-
}
100-
}
101-
partitionEvents.map {
102-
case (goodEvents, badEvents) =>
103-
ValidationCache.addToGood(goodEvents)
104-
ValidationCache.addToBad(badEvents)
105-
if (outputEnrichedTsv) {
106-
goodEvents.foreach { event =>
107-
println(event.event.toTsv)
108-
}
109-
} else ()
79+
ThriftLoader.toCollectorPayload(thriftBytes, processor, Instant.now()) match {
80+
case Validated.Valid(collectorPayload) =>
81+
adapterRegistry.toRawEvents(collectorPayload, igluClient, processor, registryLookup, enrichConfig.maxJsonDepth, Instant.now()).flatMap {
82+
case Validated.Valid(rawEvents) =>
83+
val partitionEvents = rawEvents.toList.foldLeftM((Nil, Nil): (List[GoodEvent], List[BadEvent])) {
84+
case ((good, bad), rawEvent) =>
85+
validateEvent(rawEvent).value.map {
86+
case OptionIor.Right(goodEvent) =>
87+
logger.info(s"GOOD ${formatEvent(goodEvent)}")
88+
(goodEvent :: good, bad)
89+
case OptionIor.Both((errors, badRow), _) =>
90+
val badEvent = BadEvent(Some(collectorPayload), Some(rawEvent), errors)
91+
logger.warn(s"BAD ${formatBadRow(badRow)}")
92+
(good, badEvent :: bad)
93+
case OptionIor.Left((errors, badRow)) =>
94+
val badEvent = BadEvent(Some(collectorPayload), Some(rawEvent), errors)
95+
logger.warn(s"BAD ${formatBadRow(badRow)}")
96+
(good, badEvent :: bad)
97+
case OptionIor.None =>
98+
(good, bad)
11099
}
111-
case Validated.Invalid(badRow) =>
112-
val bad = BadEvent(Some(collectorPayload), None, List("Error while extracting event(s) from collector payload and validating it/them.", badRow.compact))
113-
logger.warn(s"BAD ${bad.errors.head}")
114-
IO(ValidationCache.addToBad(List(bad)))
115100
}
116-
case None =>
117-
val bad = BadEvent(None, None, List("No payload."))
101+
partitionEvents.map {
102+
case (goodEvents, badEvents) =>
103+
ValidationCache.addToGood(goodEvents)
104+
ValidationCache.addToBad(badEvents)
105+
if (outputEnrichedTsv) {
106+
goodEvents.foreach { event =>
107+
println(event.event.toTsv)
108+
}
109+
} else ()
110+
}
111+
case Validated.Invalid(badRow) =>
112+
val bad = BadEvent(Some(collectorPayload), None, List("Error while extracting event(s) from collector payload and validating it/them.", badRow.compact))
118113
logger.warn(s"BAD ${bad.errors.head}")
119114
IO(ValidationCache.addToBad(List(bad)))
120115
}
@@ -136,7 +131,7 @@ final class MemorySink(igluClient: IgluCirceClient[IO],
136131
processor,
137132
DateTime.now(),
138133
rawEvent,
139-
EtlPipeline.FeatureFlags(acceptInvalid = false, legacyEnrichmentOrder = false),
134+
EtlPipeline.FeatureFlags(acceptInvalid = false),
140135
IO.unit,
141136
registryLookup,
142137
enrichConfig.validation.atomicFieldsLimits,

src/main/scala/com.snowplowanalytics.snowplow.micro/Run.scala

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,17 @@ import com.snowplowanalytics.snowplow.badrows.Processor
1919
import com.snowplowanalytics.snowplow.collector.core._
2020
import com.snowplowanalytics.snowplow.collector.core.model.Sinks
2121
import com.snowplowanalytics.snowplow.enrich.common.enrichments.EnrichmentRegistry
22-
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf}
23-
import com.snowplowanalytics.snowplow.enrich.common.utils.{HttpClient, ShiftExecution}
22+
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.{Enrichment, EnrichmentConf, IpLookupExecutionContext}
23+
import com.snowplowanalytics.snowplow.enrich.common.enrichments.registry.sqlquery.SqlExecutionContext
24+
import com.snowplowanalytics.snowplow.enrich.common.utils.HttpClient
2425
import com.snowplowanalytics.snowplow.micro.Configuration.MicroConfig
2526
import org.http4s.ember.client.EmberClientBuilder
2627
import org.typelevel.log4cats.Logger
2728
import org.typelevel.log4cats.slf4j.Slf4jLogger
2829

2930
import java.io.File
3031
import java.security.{KeyStore, SecureRandom}
31-
import java.util.concurrent.Executors
3232
import javax.net.ssl.{KeyManagerFactory, SSLContext, TrustManagerFactory}
33-
import scala.concurrent.ExecutionContext
3433
import scala.sys.process._
3534

3635
object Run {
@@ -99,10 +98,10 @@ object Run {
9998
private def buildEnrichmentRegistry(configs: List[EnrichmentConf]): Resource[IO, EnrichmentRegistry[IO]] = {
10099
for {
101100
_ <- Resource.eval(downloadAssets(configs))
102-
shift <- ShiftExecution.ofSingleThread[IO]
101+
ipLookupEC <- IpLookupExecutionContext.mk[IO]
102+
sqlEC <- SqlExecutionContext.mk[IO]
103103
httpClient <- EmberClientBuilder.default[IO].build.map(HttpClient.fromHttp4sClient[IO])
104-
blockingEC = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool)
105-
enrichmentRegistry <- Resource.eval(EnrichmentRegistry.build[IO](configs, shift, httpClient, blockingEC)
104+
enrichmentRegistry <- Resource.eval(EnrichmentRegistry.build[IO](configs, httpClient, ipLookupEC, sqlEC, false)
106105
.leftMap(error => new IllegalArgumentException(s"can't build EnrichmentRegistry: $error"))
107106
.value.rethrow)
108107
_ <- Resource.eval {

src/test/scala/com.snowplowanalytics.snowplow.analytics.scalasdk/EventConverterSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class EventConverterSpec extends Specification {
140140

141141
"should correctly convert contexts" >> {
142142
val enriched = newValidEvent
143-
enriched.contexts = """{
143+
enriched.setContexts("""{
144144
"schema": "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0",
145145
"data": [
146146
{
@@ -163,7 +163,7 @@ class EventConverterSpec extends Specification {
163163
}
164164
}
165165
]
166-
}"""
166+
}""")
167167

168168
val result = EventConverter.fromEnriched(enriched)
169169
result.toEither must beRight { e: Event =>
@@ -177,7 +177,7 @@ class EventConverterSpec extends Specification {
177177

178178
"should correctly convert unstruct events" >> {
179179
val enriched = newValidEvent
180-
enriched.unstruct_event = """{
180+
enriched.setUnstruct_event("""{
181181
"schema": "iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0",
182182
"data": {
183183
"schema": "iglu:com.snowplowanalytics.snowplow/link_click/jsonschema/1-0-1",
@@ -187,7 +187,7 @@ class EventConverterSpec extends Specification {
187187
"elementId": "exampleLink"
188188
}
189189
}
190-
}"""
190+
}""")
191191

192192
val result = EventConverter.fromEnriched(enriched)
193193
result.toEither must beRight { e: Event =>

src/test/scala/com.snowplowanalytics.snowplow.micro/MemorySinkSpec.scala

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package com.snowplowanalytics.snowplow.micro
1313
import cats.implicits._
1414
import cats.effect.testing.specs2.CatsResource
1515
import cats.effect.{IO, Resource}
16+
import io.circe.JsonObject
1617
import com.snowplowanalytics.iglu.client.IgluCirceClient
1718
import com.snowplowanalytics.iglu.client.resolver.Resolver
1819
import com.snowplowanalytics.iglu.client.resolver.registries.{JavaNetRegistryLookup, Registry}
@@ -63,17 +64,6 @@ class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike
6364
}
6465

6566
"validateEvent" >> {
66-
"should fail if the timestamp is not valid" >> withResource { sink =>
67-
val raw = buildRawEvent()
68-
val withoutTimestamp = raw.copy(context = raw.context.copy(timestamp = None))
69-
val expected = "Error while validating the event"
70-
sink.validateEvent(withoutTimestamp).value.map {
71-
_ must beLike {
72-
case OptionIor.Left((errors, _)) if errors.exists(_.contains(expected)) => ok
73-
}
74-
}
75-
}
76-
7767
"should fail if the event type parameter is not set" >> withResource { sink =>
7868
val raw = buildRawEvent()
7969
val withoutEvent = raw.copy(parameters = raw.parameters - "e")
@@ -169,16 +159,20 @@ class MemorySinkSpec extends CatsResource[IO, MemorySink] with SpecificationLike
169159
for {
170160
enrichConfig <- Configuration.loadEnrichConfig().value.map(_.getOrElse(throw new IllegalArgumentException("Can't read defaults from Enrich config")))
171161
igluClient <- IgluCirceClient.fromResolver[IO](Resolver[IO](List(Registry.IgluCentral), None), 500, enrichConfig.maxJsonDepth)
172-
enrichmentRegistry = new EnrichmentRegistry[IO](javascriptScript = List(buildJSEnrichment()))
162+
jsEnrichment <- buildJSEnrichment()
163+
enrichmentRegistry = new EnrichmentRegistry[IO](javascriptScript = List(jsEnrichment))
173164
processor = Processor(BuildInfo.name, BuildInfo.version)
174165
lookup = JavaNetRegistryLookup.ioLookupInstance[IO]
175166
} yield new MemorySink(igluClient, lookup, enrichmentRegistry, false, processor, enrichConfig)
176167
}
177168

178-
private def buildJSEnrichment(): JavascriptScriptEnrichment = {
169+
private def buildJSEnrichment(): IO[JavascriptScriptEnrichment] = {
179170
val js = Source.fromResource("js-enrichment.js").getLines().mkString("\n")
180171
val key = SchemaKey("com.snowplowanalytics.snowplow", "javascript_script_config", "jsonschema", SchemaVer.Full(1, 0, 0))
181-
JavascriptScriptEnrichment(key, js)
172+
JavascriptScriptEnrichment.create(key, js, JsonObject(), false) match {
173+
case Right(enrichment) => IO.pure(enrichment)
174+
case Left(error) => IO.raiseError(new RuntimeException(error))
175+
}
182176
}
183177

184178
}

src/test/scala/com.snowplowanalytics.snowplow.micro/ValidationCacheSpec.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package com.snowplowanalytics.snowplow.micro
1212

1313
import org.specs2.mutable.Specification
14+
import org.joda.time.DateTime
1415

1516
import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
1617
import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
@@ -300,6 +301,8 @@ object ValidationCacheSpec {
300301
val EmptyFilterGood: FiltersGood = FiltersGood(None, None, None, None)
301302
val EmptyFilterBad: FiltersBad = FiltersBad(None, None, None)
302303

304+
val testTimestamp = DateTime.parse("2014-01-16T00:49:58.278+00:00")
305+
303306
def cacheOf(goodInit: List[GoodEvent], badInit: List[BadEvent]): ValidationCache =
304307
new ValidationCache {
305308
var good = goodInit
@@ -334,7 +337,7 @@ object ValidationCacheSpec {
334337
None,
335338
None,
336339
CollectorPayload.Source("name1", "utf-8", None),
337-
CollectorPayload.Context(None, None, None, None, Nil, None)
340+
CollectorPayload.Context(testTimestamp, None, None, None, Nil, None)
338341
)
339342

340343
val CollectorPayload2: CollectorPayload =
@@ -344,7 +347,7 @@ object ValidationCacheSpec {
344347
None,
345348
None,
346349
CollectorPayload.Source("name2", "utf-8", None),
347-
CollectorPayload.Context(None, None, None, None, Nil, None)
350+
CollectorPayload.Context(testTimestamp, None, None, None, Nil, None)
348351
)
349352

350353
val BadEvent1: BadEvent =

src/test/scala/com.snowplowanalytics.snowplow.micro/events.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ object events {
9292
contentType = None,
9393
Source("Micro", "UTF-8", Some("localhost")),
9494
Context(
95-
Some(DateTime.now()),
95+
DateTime.now(),
9696
Some("0:0:0:0:0:0:0:1"),
9797
Some("curl/7.52.1"),
9898
None,

0 commit comments

Comments
 (0)