Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make request paths configurable #65

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/snyk.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ jobs:
uses: snyk/actions/scala@master
with:
command: monitor
args: --project-name=snowplow-event-generator
args: --project-name=snowplow-event-generator --prune-repeated-subdependencies
env:
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }}
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 0.6.0 (2023-08-15)
--------------------------


Version 0.5.0 (2023-08-14)
--------------------------
Move cli options to the config (#47)
Expand Down
39 changes: 38 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ Alternatively you can write events directly to a S3 bucket:
}
```

...or a Snowplow collector:

```
"output": {
"type": "Http"
"endpoint": "https://my.collector.endpoint.com"
}
```

Then run:
```bash
./snowplow-event-generator --config my-config.hocon
Expand Down Expand Up @@ -186,7 +195,31 @@ Aside from "output" configuration, all fields in the configuration file are opti

// Only used for "Fixed" timestamps. Change this to generate more recent or more historic events.
"at": "2022-02-01T01:01:01z"
},
}

// Set weights for the distributions of event types.
// Setting a fequency to 0 results in that event type not being produced at all
// Setting equal values results in an even distribution of event types.
"eventFrequencies": {
"struct": 1
"unstruct": 1
"pageView": 1
"pagePing": 1
"unstructEventFrequencies": {
"changeForm": 1
"funnelInteraction": 1
"linkClick": 1
}
}

// HTTP output only - set weights for the distributions of request methods.
// Setting a fequency to 0 results in that method not being produced at all
// Note that head requests are currently not implemented and will result in an exception.
"methodFrequencies": {
"post": 1
"get": 1
"head": 0
}

// Required: Storage to sink generated events into
// Currently only a single output at a time is supported
Expand Down Expand Up @@ -216,6 +249,10 @@ Aside from "output" configuration, all fields in the configuration file are opti
// "type": "PubSub"
// Required: PubSub stream URI
// "uri": "pubsub://projects/my-project/topics/my-topic"

// "type": "Http"
// Required: Snowplow collector endpoint, including protocol
// "endpoint": "https://my.collector.endpoint.com"
}
}
```
Expand Down
10 changes: 8 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,14 @@ lazy val sinks = project
Dependencies.Libraries.kcl,
Dependencies.Libraries.fs2Pubsub,
Dependencies.Libraries.fs2Kafka,
Dependencies.Libraries.awsRegions
)
Dependencies.Libraries.awsRegions,
"org.http4s" %% "http4s-ember-client" % "0.23.15",
"org.http4s" %% "http4s-circe" % "0.23.15"
// TODO move this
),
//libraryDependencies += "org.typelevel" %% "cats-effect" % "3.4.6",
// libraryDependencies += "org.http4s" %% "http4s-ember-client" % "0.23.15",
// libraryDependencies += "org.http4s" %% "http4s-circe" % "0.23.15"
)
.dependsOn(core)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,27 @@ final case class Api(vendor: String, version: String) {
}

object Api {
private val GenI = Gen.const(Api("i", ""))
private val GenIce = Gen.const(Api("ice", ".png"))
val GenI = Gen.const(Api("i", ""))
val GenIce = Gen.const(Api("ice", ".png"))

def fixedApis: Gen[Api] = Gen.oneOf(GenI, GenIce)

def genApi(nEvents: Int): Gen[Api] =
(nEvents match {
def genApi(apiType: Int): Gen[Api] =
(apiType match {
case 0 => (genVendor, genVersion)
case 1 => (Gen.const("com.snowplowanalytics.snowplow"), Gen.oneOf("tp1", "tp2"))
case _ => (Gen.const("com.snowplowanalytics.snowplow"), Gen.const("tp2"))
case 1 => (Gen.const("com.snowplowanalytics.snowplow"), Gen.const("tp1"))
case 2 => (Gen.const("com.snowplowanalytics.snowplow"), Gen.const("tp2"))
}).mapN(Api.apply)

private def genVendor =
def genVendor =
for {
venPartsN <- Gen.chooseNum(1, 5)
venNs <- Gen.listOfN(venPartsN, Gen.chooseNum(1, 10))
vendorParts <- Gen.sequence[List[String], String](venNs.map(Gen.stringOfN(_, Gen.alphaNumChar)))
sep <- Gen.oneOf("-", ".", "_", "~")
} yield vendorParts.mkString(sep)

private def genVersion =
def genVersion =
for {
verN <- Gen.chooseNum(1, 10)
version <- Gen.stringOfN(verN, Gen.alphaNumChar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,129 @@ object HttpRequest {
val path: Api
}

case class MethodFrequencies(
get: Int,
post: Int,
head: Int
)

case class PathFrequencies(
i: Int,
ice: Int,
tp1: Int,
tp2: Int,
random: Int,
providedPath: Option[ProvidedPathFrequency]
)

case class ProvidedPathFrequency(
vendor: String,
version: String,
frequency: Int
)

object Method {
final case class Post(path: Api) extends Method
final case class Get(path: Api) extends Method
final case class Head(path: Api) extends Method

def gen: Gen[Method] = Gen.oneOf(genPost, genGet, genHead)
def gen(methodFreq: MethodFrequencies, pathFreq: PathFrequencies): Gen[Method] = {
Gen.frequency(
(methodFreq.post, genPost(pathFreq)),
(methodFreq.get, genGet(pathFreq)),
(methodFreq.head, genHead(pathFreq))
)
}

private def genPost(pathFreq: PathFrequencies): Gen[Method.Post] =
Gen.frequency(
pathFreq.providedPath match {
case Some(provided) => (provided.frequency, Api(provided.vendor, provided.version))
case None => (0, Api("", ""))
},
(pathFreq.random, genApi(0)),
(pathFreq.tp2, genApi(2)),
).map(Method.Post)

private def genPost: Gen[Method.Post] = Gen.oneOf(genApi(0), genApi(200)).map(Method.Post)
private def genGet: Gen[Method.Get] = Gen.oneOf(fixedApis, genApi(0), genApi(1)).map(Method.Get)
private def genHead: Gen[Method.Head] = Gen.oneOf(fixedApis, genApi(0), genApi(1)).map(Method.Head)
private def genGet(pathFreq: PathFrequencies): Gen[Method.Get] =
Gen.frequency(
pathFreq.providedPath match {
case Some(provided) => (provided.frequency, Api(provided.vendor, provided.version))
case None => (0, Api("", ""))
},
(pathFreq.i, Api.GenI), // /i
(pathFreq.ice, Api.GenIce), // /ice.png
(pathFreq.random, genApi(0)), // randomly generated path
(pathFreq.tp1, genApi(1)), // /com.snowplowanalytics.snowplow/tp1
(pathFreq.tp2, genApi(2)) // /com.snowplowanalytics.snowplow/tp2
).map(Method.Get)

private def genHead(pathFreq: PathFrequencies): Gen[Method.Head] =
Gen.frequency(
pathFreq.providedPath match {
case Some(provided) => (provided.frequency, Api(provided.vendor, provided.version))
case None => (0, Api("", ""))
},
(pathFreq.i, Api.GenI),
(pathFreq.ice, Api.GenIce),
(pathFreq.random, genApi(0)),
(pathFreq.tp1, genApi(1)),
(pathFreq.tp2, genApi(2))
).map(Method.Head)
}

def gen(
eventPerPayloadMin: Int,
eventPerPayloadMax: Int,
now: Instant,
frequencies: EventFrequencies
): Gen[HttpRequest] =
frequencies: EventFrequencies,
methodFrequencies: Option[MethodFrequencies],
pathFrequencies: Option[PathFrequencies]
): Gen[HttpRequest] = {
// MethodFrequencies and pathFrequencies are options here, at the entrypoint in order not to force a breaking change where this is a lib.
// If it's not provided, we give equal distribution to each to achieve behaviour parity
// From here in it's not an option, just to make the code a bit cleaner
val methodFreq = methodFrequencies.getOrElse(new MethodFrequencies(1, 1, 1))
val pathFreq = pathFrequencies.getOrElse(new PathFrequencies(1,1,1,1,1, None))
genWithParts(
HttpRequestQuerystring.gen(now, frequencies),
HttpRequestBody.gen(eventPerPayloadMin, eventPerPayloadMax, now, frequencies),
methodFreq,
pathFreq
)
}

def genDup(
natProb: Float,
synProb: Float,
natTotal: Int,
synTotal: Int,
eventPerPayloadMin: Int,
eventPerPayloadMax: Int,
now: Instant,
frequencies: EventFrequencies,
methodFrequencies: Option[MethodFrequencies],
pathFrequencies: Option[PathFrequencies]
): Gen[HttpRequest] = {
// MethodFrequencies and pathFrequencies are options here, at the entrypoint in order not to force a breaking change where this is a lib.
// If it's not provided, we give equal distribution to each to achieve behaviour parity
// From here in it's not an option, just to make the code a bit cleaner
val methodFreq = methodFrequencies.getOrElse(new MethodFrequencies(1, 1, 1))
val pathFreq = pathFrequencies.getOrElse(new PathFrequencies(1,1,1,1,1, None))
genWithParts(
// qs doesn't do duplicates?
HttpRequestQuerystring.gen(now, frequencies),
HttpRequestBody.gen(eventPerPayloadMin, eventPerPayloadMax, now, frequencies)
HttpRequestBody
.genDup(natProb, synProb, natTotal, synTotal, eventPerPayloadMin, eventPerPayloadMax, now, frequencies),
methodFreq,
pathFreq
)
}


private def genWithParts(qsGen: Gen[HttpRequestQuerystring], bodyGen: Gen[HttpRequestBody]) =
private def genWithParts(qsGen: Gen[HttpRequestQuerystring], bodyGen: Gen[HttpRequestBody], methodFreq: MethodFrequencies, pathFreq: PathFrequencies) =
for {
method <- Method.gen
method <- Method.gen(methodFreq, pathFreq)
qs <- Gen.option(qsGen)
body <- method match {
case Method.Head(_) => Gen.const(None) // HEAD requests can't have a message body
Expand Down
8 changes: 5 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object Dependencies {
val circe = "0.14.1"
val fs2Pubsub = "0.22.0"
val fs2Kafka = "3.0.1"
val awsRegions = "2.20.69"
val awsSdk = "2.20.123"
// Scala (test only)
val specs2 = "4.12.3"
val scalaCheck = "1.14.0"
Expand All @@ -44,6 +44,7 @@ object Dependencies {
val badRows = "2.1.1"
val httpClient = "4.5.13"
val thrift = "0.15.0" // override transitive dependency to mitigate security vulnerabilities
val http4s = "0.23.15"
}

object Libraries {
Expand All @@ -64,8 +65,9 @@ object Dependencies {
val scalaCheckCats = "io.chrisdavenport" %% "cats-scalacheck" % V.scalaCheckCats
val httpClient = "org.apache.httpcomponents" % "httpclient" % V.httpClient
val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j
val kcl = "software.amazon.kinesis" % "amazon-kinesis-client" % V.kcl
val awsRegions = "software.amazon.awssdk" % "regions" % V.awsRegions
val kcl = "software.amazon.awssdk" % "kinesis" % V.awsSdk
val awsRegions = "software.amazon.awssdk" % "regions" % V.awsSdk
val http4sClient = "org.http4s" %% "http4s-blaze-client" % V.http4s

// Scala (test only)
val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test
Expand Down
8 changes: 7 additions & 1 deletion sinks/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,11 @@
"linkClick": 1
}
}

"methodFrequencies": {
# Setting these defaults becuase HEAD sink is not implemented yet,
# but it is still possibly in use as a library in our module tests
"post": 1
"get": 1
"head": 0
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.circe.generic.extras.semiauto._

import com.snowplowanalytics.snowplow.eventgen.protocol.event.EventFrequencies
import com.snowplowanalytics.snowplow.eventgen.protocol.event.UnstructEventFrequencies
import com.snowplowanalytics.snowplow.eventgen.tracker.HttpRequest.{MethodFrequencies, PathFrequencies, ProvidedPathFrequency}

final case class Config(
payloadsTotal: Int,
Expand All @@ -41,6 +42,8 @@ final case class Config(
duplicates: Option[Config.Duplicates],
timestamps: Config.Timestamps,
eventFrequencies: EventFrequencies,
methodFrequencies: Option[MethodFrequencies],
pathFrequencies: Option[PathFrequencies],
output: Config.Output
)

Expand Down Expand Up @@ -69,6 +72,7 @@ object Config {
case class File(path: URI) extends Output
case class PubSub(subscription: String) extends Output
case class Kafka(brokers: String, topic: String, producerConf: Map[String, String] = Map.empty) extends Output
case class Http(endpoint: org.http4s.Uri) extends Output
}

val configOpt = Opts.option[Path]("config", "Path to the configuration HOCON").orNone
Expand All @@ -85,9 +89,18 @@ object Config {
implicit val duplicatesDecoder: Decoder[Duplicates] =
deriveConfiguredDecoder[Duplicates]

implicit val frequenciesDecoder: Decoder[EventFrequencies] =
implicit val eventFrequenciesDecoder: Decoder[EventFrequencies] =
deriveConfiguredDecoder[EventFrequencies]

implicit val methodFrequenciesDecoder: Decoder[MethodFrequencies] =
deriveConfiguredDecoder [MethodFrequencies]

implicit val providedPathFrequencyDecoder: Decoder[ProvidedPathFrequency] =
deriveConfiguredDecoder [ProvidedPathFrequency]

implicit val pathFrequenciesDecoder: Decoder[PathFrequencies] =
deriveConfiguredDecoder [PathFrequencies]

implicit val unstructEventFrequenciesDecoder: Decoder[UnstructEventFrequencies] =
deriveConfiguredDecoder[UnstructEventFrequencies]

Expand All @@ -97,6 +110,10 @@ object Config {
Either.catchOnly[IllegalArgumentException](URI.create(str)).leftMap(_.getMessage)
}

implicit val httpUriDecoder: Decoder[org.http4s.Uri] = Decoder[String].emap { str =>
org.http4s.Uri.fromString(str).leftMap(_.getMessage)
}

implicit val kafkaDecoder: Decoder[Output.Kafka] =
deriveConfiguredDecoder[Output.Kafka]

Expand All @@ -109,6 +126,9 @@ object Config {
implicit val pubSubDecoder: Decoder[Output.PubSub] =
deriveConfiguredDecoder[Output.PubSub]

implicit val httpDecoder: Decoder[Output.Http] =
deriveConfiguredDecoder[Output.Http]

implicit val outputDecoder: Decoder[Output] =
deriveConfiguredDecoder[Output]

Expand Down
Loading