Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/effeciencies #69

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open

Refactor/effeciencies #69

wants to merge 6 commits into from

Conversation

colmsnowplow
Copy link
Contributor

@colmsnowplow colmsnowplow commented Sep 12, 2023

This PR refactors two things:

  1. Instead of duplicating code such that there's a method that gets called for duplicates generation, and a separate for non-duplicates, we just use the one now. If we don't want duplicates we just configure it to produce 0 duplicates.
  2. Instead of generating the data for all outputs every time, we now only generate data for those types we actually want.

There are further improvments that can be found, such as refactoring config and allowing more combinations of sink & type - but this is as much as I got done on the flight back from the team offsite :)

@colmsnowplow colmsnowplow mentioned this pull request Sep 12, 2023
@colmsnowplow colmsnowplow changed the base branch from main to release/0.6.0 September 12, 2023 08:35
Copy link
Contributor

@peel peel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. I'd challenge the choice of dropping RuntimeException in sinks though.

private def genPost: Gen[Method.Post] = Gen.oneOf(genApi(0), genApi(200)).map(Method.Post)
private def genGet: Gen[Method.Get] = Gen.oneOf(fixedApis, genApi(0), genApi(1)).map(Method.Get)
private def genHead: Gen[Method.Head] = Gen.oneOf(fixedApis, genApi(0), genApi(1)).map(Method.Head)
private def genPost(pathFreq: PathFrequencies): Gen[Method.Post] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably extract a template method here to avoid duplication and simplify testing, but that's a not a big deal.


def fixedApis: Gen[Api] = Gen.oneOf(GenI, GenIce)

def genApi(nEvents: Int): Gen[Api] =
(nEvents match {
def genApi(apiType: Int): Gen[Api] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finally makes sense. Previously we had: HttpRequest#genPost (genApi(200)).

st.map(_._3).map(buildRequesst).evalMap(req => client.status(req)).void)
st
.map(_._3)
.map(_.getOrElse(throw new RuntimeException("Http sink received no data")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need to raise here.
Doing something along the lines of seems a bit more idiomatic:

st
  .map(_._3.toList) // we filter None out here
  .flatMap(Stream.emits)
  .map(buildRequesst)
  .evalMap(req => client.status(req)).void

We already have the logic for logging event count after it's done and completed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped in a runtime exception because we to date have no sensible configuration handling of what formats go with what options - we just make assumptions about what the user wants to do. I'd like to change that, but until we do, this is just a janky stopgap - at least the app crashes when we configure the wrong format for the wrong target.

We already have the logic for logging event count after it's done and completed.

I don't think we do - by my reading of the code (which isn't very reliable), for the other formats we are counting that before passing to the sink. And it also appears not to be reliably accurate (but that's for another day). I couldn't see any logic in the app where the sink passes a result back up to the main flow of the app - I think we should do that for http target results at least - but it was beyond my abilities to get that done before I 'ran out of road' on this so to speak.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should add I'm not sure if I addressed the comment, I'm not sure I understand it 100% but we can discuss!

@@ -63,7 +66,7 @@ object Main extends IOApp {

}

type GenOutput = (collector.CollectorPayload, List[Event], HttpRequest)
type GenOutput = (Option[collector.CollectorPayload], Option[List[Event]], Option[HttpRequest])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly, that Option[List[Event]] is about communicating that the _._2 here is only appearing under particular condition? None is functionally equivalent to List.empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we generated all 3 formats every time. This PR changes it to only generate the format we are interested in according to the configuration.

Option[List[Event]] is about defining the type for something which is None if not configured, but is List[Event] if configured to be so.

@@ -155,13 +173,15 @@ object Main extends IOApp {
def fileSink(fileConfig: Config.Output.File): Pipe[F, GenOutput, Unit] =
RotatingSink.rotate(config.payloadsPerFile) { idx =>
val pipe1: Pipe[F, GenOutput, Nothing] =
_.map(_._1)
_.map(_._1.getOrElse(throw new RuntimeException("File sink receieved no collector payload data")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as with other sinks + RuntimeException applies.

@@ -242,7 +262,7 @@ object Main extends IOApp {

eventStream
.take(config.payloadsTotal.toLong)
.zipWithScan1((0, 0)) { case (idx, el) => (el._2.length + idx._1, 1 + idx._2) }
.zipWithScan1((0, 0)) { case (idx, el) => (el._2.getOrElse(Seq()).length + idx._1, 1 + idx._2) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.map(_.length).getOrElse(0) maybe? That'd avoid allocating memory for seq/empty array. Means the same though.

// In fact there's likely a simpler pattern for this whole chunk o logic
// I think ideally the config might be refactored to make this simpler
(
collPayloadNeeded match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option.when(collPayloadNeeded)(runGen(collPayload, rng))

May be what you're looking for

// If no duplicate config is provided, profide a configuration that produces no duplicates.
val dups = config.duplicates.getOrElse(Config.Duplicates(0,0,0,0))
Stream.repeatEval(
Sync[F].delay( {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Sync[F].delay( {
Sync[F].delay {

Base automatically changed from release/0.6.0 to main September 26, 2023 16:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants