Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/http sink #58

Merged
merged 2 commits into from
Sep 12, 2023
Merged

Feature/http sink #58

merged 2 commits into from
Sep 12, 2023

Conversation

colmsnowplow
Copy link
Contributor

@colmsnowplow colmsnowplow commented Aug 11, 2023

Draft PR for http sink.

Notes

  • It's not perfect but it works - likely a lot of improvements could be made.
  • Current implementation omits qs for post, omits body for get, and sends a hardcoded get for HEAD
  • We still generate data for all formats every time, which is inefficient. I can see how to refactor it to improve this but don't have the time - it's already in an issue.
  • I think it'd be beneficial to be able to configure paths/weights of the different path options - I hope to get to adding this separately for this release, but we don't need it for collector so I'm first going to move to the deployment of this asset as an RC.
  • There is a bug whereby the data generated itself is producing corrupt context data sometimes - which when base64-encoded cx field results in a JSON with the character '7' at the end (When decoded). It results in enrichment failures - so not critical to current use case but I'll log an issue.

TODO this release:

  • Address review feedback
  • Make distribution of GET/POST/HEAD configurable
  • Fix origin headers
  • Fix HEAD requests (or do something sensible with them) - Won't do - the server gives an error. Just leaving unimplemented for now.

To make tickets for:

package com.snowplowanalytics.snowplow.eventgen

import com.snowplowanalytics.snowplow.eventgen.tracker.HttpRequest
import com.snowplowanalytics.snowplow.eventgen.tracker.HttpRequest.{Method => trackerMethod}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unusual to rename from Upper case name to lower case name. More normal to write {Method => TrackerMethod}

}
*/

val address = Uri.fromString(properties.endpoint + generatedRequest.method.path.toString()) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few comments about this little section:

Easy one -- consider using IllegalArgumentException instead of Exception.

Another easy one -- It's unusual to use the .format syntax in scala. It's more normal to write it like

s"blah blah ${properties.endpoint} blah blah ${generatedRequest.method.path}"

A nice change you could make is to move the Uri.fromString(...) outside of this buildRequest block. So you only run it once when the app first starts up. And you only need to handle the error scenario once.

Then, once you have generated a Uri, you can use syntax like:

val address = baseUri / generatedRequest.method.path.vendor / generatedRequest.method.path.version

val httpClient = EmberClientBuilder.default[F].build

st: Stream[F, Main.GenOutput] =>
st.map(_._3).map(buildRequesst).evalMap(req => httpClient.use(client => client.expect[String](req))).void
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here when you call httpClient.use { ..... } it creates and destroys the http client + all of the internal objects needed to implement it. Like a TCP connection pool etc.

The problem is, you call .use for every single request. So you're continually creating/destroying the resources, which is quite wasteful.

It's better to find a way to .use the http client just once, and then re-use it for every request. I'd be happy to show you a few ways to arrange the code for this.

case (k, v) if k != "Origin" => List(k, v)
// We need to unpack multiple Origins in discrete key-value pairs
// to play nicely with [HttpRequest.builder().headers()]
case (k, v) => intersperse(k, v.split(",").toList)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is the right approach. I think we can make it neater though!

@@ -69,6 +69,7 @@ object Config {
case class File(path: URI) extends Output
case class PubSub(subscription: String) extends Output
case class Kafka(brokers: String, topic: String, producerConf: Map[String, String] = Map.empty) extends Output
case class Http(endpoint: String) extends Output
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you made endpoint a org.http4s.Uri instead of a String in the config, then you wouldn't need to handle parsing exceptions later on when generating requests.

@colmsnowplow colmsnowplow force-pushed the release/0.5.0 branch 2 times, most recently from e6d1caf to 6e80cce Compare August 14, 2023 12:01
@colmsnowplow colmsnowplow changed the base branch from release/0.5.0 to release/0.6.0 August 15, 2023 11:38
@colmsnowplow colmsnowplow marked this pull request as ready for review August 16, 2023 15:22
@colmsnowplow
Copy link
Contributor Author

Note: HEAD requests are still not implemented. Using client.status instead of client.expect does prevent the app from crashing with these requests, but they all get:

This hits org.http4s.client.UnexpectedStatus: unexpected HTTP status: 422 Unprocessable Entity for request HEAD

Since I suspect that this is something to do with the request itself, rather than an expected error from the collector, I've left it for another day.

I have also made a separate PR to make request paths configurable. With the current implementation, we can generate /r/* paths, for which the collector correctly responds with 4XX or 3XX responses. Since it's desirable that for some of our tests (eg load stress testing) expect 100% of the data to succeed (ie. where failure responses are indicators of the limits of the infrastructure), and it was fairly simple to implement (while other work was blocked), I made the PR.

@colmsnowplow
Copy link
Contributor Author

@istreeter it doesn't need your urgent attention, I've also flagged it to @peel who will likely soon be using the tool - but as an FYI/because I think you might be interested, see last comment for where things lie.

@colmsnowplow
Copy link
Contributor Author

Note: Formatting might be off. I realised that I hadn't run scalafmt long after I had separately branched off. I plan to do that before release, just holding off until after I have to rebase one or more of these branches. :)

Copy link
Contributor

@peel peel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea of being able to tune HTTP request formats is fantastic and overall this feature is massively helpful. Great job!

Please do run formatting and remove dead code. A few extra (hopefully helpful) comments below.


def sink[F[_]: Async](properties: Config.Output.Http): Pipe[F, Main.GenOutput, Unit] = {

val baseUri = Uri.fromString(properties.endpoint) match {
Copy link
Contributor

@peel peel Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's one thing that I'd change is the org.http4s.Uri instead of String in the config. This way you don't need to additionally validate the config here. Therefore sink becomes less defensive behavioural function. But the config is not affected, so we can do that at a better time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry Peel I'm not sure I understand what you're saying here?

generatedRequest: HttpRequest
): Request[F] = {

// Origin headers are given to us a key and a comma separated string of values,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nice example where you'd be able to use a massively helpful quickcheck/property-based testing approach that's often used within Scala/FP-projects. We shouldn't do any of that now, but sharing for the sake of it.

The approach here would likely be:

  • generate a collection of header name and it's values making sure Origin appears there at least once
  • collect them into a desired format, where each header's values are comma-separated - that will be the headers argument value
  • expected return value is the original collection where each key's value is a single Raw
  • run through parseHeaders and compare input and output

The benefit of this approach is we cover all the scenarios and hint that we don't really care about the values but the shape of it. Next time we get around to looking at the kind of logic we'd know what the author cared about and the comment itself becomes a spec.

Copy link
Contributor Author

@colmsnowplow colmsnowplow Aug 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so I didn't touch the data generation part, but I did feel that tweaks to that implementation could make our lives easier here/make our approach more idiomatic - which I think is what you're getting at here, if I understand correctly.

}
// TODO: Some code repetition can probably be removed from this

return req
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, not a reason for changing anything. But maybe that's a personal preference, but I've always found a function should have a single reason to change. Never been a massive fan of internal methods especially as it makes them hard to test individually.

I'd structure this function in a way that address, body, parseHeaders are separate functions, this way buildRequest could potentially look like this:

def buildRequesst(generatedRequest: HttpRequest): Request[F] = generatedRequest.method match {
  case TrackerMethod.Post(_) =>  Request[F](...)
  ...
}

def address(_) = _
def body(_) = _ 
def parseHeaders(_) = _

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my approach was to implement something that works, then look at how to refactor. I don't think I disagree with your perspective but at the same time, as things stood I didn't see an obvious way that a refactor would simplify/improve things - at least from the point of view that actually we don't seem to be repeating anything necessarily.

Not that I think it can't be factored better/more nicely - I just didn't see a quick win and didn't want to let perfect be the enemy of good. If support rota is quiet for the rest of the week I might get the chance to revisit this. :)

@colmsnowplow
Copy link
Contributor Author

colmsnowplow commented Sep 4, 2023

@peel / @istreeter I think it's ready for final review now, removed dead code and now parsing endpoint as Uri in config.

I will rebase commits before merging just didn't wanna make the convos hard to track. :)

Edit: I have just realised it's already got approval. I'll rebase and merge, apologies for unnecessary noise.

@colmsnowplow colmsnowplow merged commit 5923b04 into release/0.6.0 Sep 12, 2023
@colmsnowplow colmsnowplow deleted the feature/httpSink branch September 12, 2023 07:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants