Skip to content

Introduce a BatchingMacrotaskExecutor #3225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

armanbilge
Copy link
Member

This one will definitely require some kind of benchmarking :)

This is inspired by the timer+I/O-integrated runtime work, where we only check for outstanding timers and I/O every 64 iterations of the runloop. Meanwhile the MacrotaskExecutor is based on setImmediate, which gives priority to I/O events.

Schedules the "immediate" execution of the callback after I/O events' callbacks.

https://nodejs.org/api/timers.html#setimmediatecallback-args

Cats Effect and downstream libraries are very fiber-happy so it seems a bit outrageous that every started fiber must wait for an entire event-loop cycle before it runs. This is compounded by the fact that in browsers we generally can't even submit directly to the macrotask queue and rely on hacks such as postMessage.

Indeed, this is the usual performance vs fairness tradeoff. But I am still unconvinced about the importance of fairness in JS.

  1. In a JS lambda, you are limited to processing one request at a time. So it's not really obvious to me what there is to be unfair to, and how that might be observed.

  2. In the browser, lack of fairness is most easily observed as delayed rendering. But if you are doing so much work in response to some event (e.g. user input or websocket message) that it affects rendering you are probably in trouble anyway. Increasing fairness will at best enable progressive but glitchy rendering and at worst make no difference.

Ironically the MacrotaskExecutor is about as fair as it gets 😅


private[this] var needsReschedule: Boolean = true

private[this] val executeQueue: ArrayDeque[Runnable] = new ArrayDeque
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Performance will be sub-optimal until we get a Scala.js release with this.

import scala.concurrent.ExecutionContext
import scala.scalajs.LinkingInfo

private[unsafe] abstract class IORuntimeCompanionPlatform { this: IORuntime.type =>

def defaultComputeExecutionContext: ExecutionContext =
def defaultComputeExecutionContext: ExecutionContext = {
val ec = new BatchingMacrotaskExecutor(64)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this configurable.

@djspiewak
Copy link
Member

I'm skeptical! I'll take a closer look soon, but in general, I'd rather tune the fairness/throughput coefficient within IOFiber itself, rather than at the executor level.

@armanbilge
Copy link
Member Author

I'm skeptical! I'll take a closer look soon, but in general, I'd rather tune the fairness/throughput coefficient within IOFiber itself, rather than at the executor level.

I don't see how that's possible here.

@djspiewak
Copy link
Member

Okay I have an idea: what if we simply let IOFiber handle this? We're basically saying that there's a certain class of operations (e.g. start) which we would like to execute with microtask semantics, up to some bound, and we want to do this just on JS platforms. In particular, we can either make scheduleFiber platform-specific (which could allow us to remove WSTP from the JS binary), or we could make a special variant of it which is itself platform-specific. JVM and Native behavior should be unaffected.

@armanbilge
Copy link
Member Author

We're basically saying that there's a certain class of operations (e.g. start) which we would like to execute with microtask semantics, up to some bound

Yes, exactly. but how do we implement this bound? It seems like it requires some "global" state (at least from the perspective of an individual fiber). Which brings us to the ExecutionContext.

@djspiewak
Copy link
Member

Oh yes, we do implement a wrapping EC like this one, but we don't batch all executes. Instead, we have an additional method which adds to the batch and is called by the Start interpreter.

@armanbilge
Copy link
Member Author

Ahhh, I gotcha now. Makes sense, thanks!

Comment on lines 1274 to 1283
private[this] def scheduleFiber(ec: ExecutionContext, fiber: IOFiber[_]): Unit = {
if (ec.isInstanceOf[WorkStealingThreadPool]) {
if (Platform.isJvm && ec.isInstanceOf[WorkStealingThreadPool]) {
val wstp = ec.asInstanceOf[WorkStealingThreadPool]
wstp.execute(fiber)
} else if (Platform.isJs && ec.isInstanceOf[BatchingMacrotaskExecutor]) {
val bmte = ec.asInstanceOf[BatchingMacrotaskExecutor]
bmte.schedule(fiber)
} else {
scheduleOnForeignEC(ec, fiber)
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I know this is not really what you suggested but on the other hand this lets us avoid unintentionally de-optimizing a private[this] method. Here's what this decompiles to on the JVM:

    private void scheduleFiber(ExecutionContext ec, IOFiber<?> fiber) {
        if (true && ec instanceof WorkStealingThreadPool) {
            WorkStealingThreadPool wstp = (WorkStealingThreadPool)ec;
            wstp.execute(fiber);
        } else if (false && ec instanceof BatchingMacrotaskExecutor) {
            BatchingMacrotaskExecutor bmte = (BatchingMacrotaskExecutor)ec;
            bmte.schedule(fiber);
        } else {
            this.scheduleOnForeignEC(ec, fiber);
        }
    }

The downside is now BatchingMacrotaskExecutor is living in JVM/Native binaries ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty big deoptimization since it makes the branching much more complex. It's possible that the constants together with JIT inlining cause the ultimately-emitted assembly to elide the impossible branch, but I would want to check that before we really trust it. We can avoid this by relying on IOFiberPlatform instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I rearranged the platform checks and now the emitted bytecode is completely identical to what it was before. I understand objections based on style / increased binary surface, but I do not see how this is not the maximally optimized implementation.

    private void scheduleFiber(ExecutionContext ec, IOFiber<?> fiber) {
        if (ec instanceof WorkStealingThreadPool) {
            WorkStealingThreadPool wstp = (WorkStealingThreadPool)ec;
            wstp.execute(fiber);
        } else {
            this.scheduleOnForeignEC(ec, fiber);
        }
    }

@armanbilge
Copy link
Member Author

So I'm giving this a try. Still need to take another pass adding some internal scaladocs and maybe bikeshedding the names.

I think every operation that scheduleFiber() is used for makes sense for batching via the microtask (promises) executor, namely: start, racePair, and resuming an async. Meanwhile, (auto-)ceding is implemented with rescheduleFiber() and this should definitely go through the macrotask executor, so that it actually cedes to the event loop :)

So I think this should work well for the typical scenario where an incoming UI/IO event is picked up by either an async or Dispatcher, and starts/resumes a handful of short-running fibers to process the event.


import scala.concurrent.ExecutionContext

private[effect] sealed abstract class BatchingMacrotaskExecutor private ()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another disadvantage of the Platform encoding used above is this type now leaks across to JVM and Native.

Comment on lines 1274 to 1283
private[this] def scheduleFiber(ec: ExecutionContext, fiber: IOFiber[_]): Unit = {
if (ec.isInstanceOf[WorkStealingThreadPool]) {
if (Platform.isJvm && ec.isInstanceOf[WorkStealingThreadPool]) {
val wstp = ec.asInstanceOf[WorkStealingThreadPool]
wstp.execute(fiber)
} else if (Platform.isJs && ec.isInstanceOf[BatchingMacrotaskExecutor]) {
val bmte = ec.asInstanceOf[BatchingMacrotaskExecutor]
bmte.schedule(fiber)
} else {
scheduleOnForeignEC(ec, fiber)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pretty big deoptimization since it makes the branching much more complex. It's possible that the constants together with JIT inlining cause the ultimately-emitted assembly to elide the impossible branch, but I would want to check that before we really trust it. We can avoid this by relying on IOFiberPlatform instead.

@armanbilge armanbilge changed the base branch from series/3.4.x to series/3.x December 4, 2022 07:09
@armanbilge armanbilge marked this pull request as ready for review December 4, 2022 08:15
@armanbilge armanbilge added this to the v3.5.0 milestone Dec 4, 2022
@armanbilge
Copy link
Member Author

armanbilge commented Dec 4, 2022

Ok, I think this is ready for another look, thanks for all the pointers. No rush, since it has to wait until 3.5.x because it needs a Scala.js upgrade. (Unless there's something wrong with my approach, and we can do this without the ArrayDeque. Update: I ended up removing the ArrayDeque because it has issues ...)

The only outstanding issue is whether to do the platforming with compile-time conditionals or platform traits.

@armanbilge
Copy link
Member Author

I benchmarked a warmed-up Ember.js "hello world" server with these changes.

Under concurrent load, performance is roughly similar.

When considering a single connection, the improvement is prominent: 60% higher RPS.

This makes sense: in the single connection case, there are never any other I/O events besides the one being currently handled. So ceding to the event loop during processing of the I/O event is pointless.

Meanwhile, in the concurrent connection case, nothing suggests that fairness has been compromised. This is consistent with the pattern where reacting to an I/O event starts a small number of short-lived fibers.

50 concurrent connections

3.4.2

Summary:
  Total:	30.0618 secs
  Slowest:	8.6726 secs
  Fastest:	0.0125 secs
  Average:	0.0966 secs
  Requests/sec:	517.2019

3.5-88faeed

Summary:
  Total:	30.0469 secs
  Slowest:	6.6952 secs
  Fastest:	0.0022 secs
  Average:	0.0884 secs
  Requests/sec:	564.7503

1 connection

3.4.2

Summary:
  Total:	30.0012 secs
  Slowest:	0.0561 secs
  Fastest:	0.0022 secs
  Average:	0.0031 secs
  Requests/sec:	322.8537

3.5-88faeed

Summary:
  Total:	30.0012 secs
  Slowest:	0.0244 secs
  Fastest:	0.0013 secs
  Average:	0.0020 secs
  Requests/sec:	499.9141

Comment on lines +21 to +26
/**
* A JS-Array backed circular buffer FIFO queue. It is careful to grow the buffer only using
* `push` to avoid creating "holes" on V8 (this is a known shortcoming of the Scala.js
* `j.u.ArrayDeque` implementation).
*/
private final class JSArrayQueue[A] {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@armanbilge
Copy link
Member Author

Besides the happy-path performance improvements, it's worth pointing out that this new EC helps mitigate other performance issues:

  • We no longer have to wrap every fiber to support fiber dumps, since the EC natively supports this.
  • If you somehow are in an environment where macrotasks are subject to the 4ms+ clamping, then there is no longer a clamping overhead for each fiber. Indeed, if each UI or I/O event spawns a small batch of short-lived fibers, clamping would not even be triggered.

Copy link
Member

@djspiewak djspiewak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still very uncomfortable with the Platform trait but I'll try to hit that in a follow-up.

@djspiewak djspiewak merged commit e4f2b71 into typelevel:series/3.x Jan 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants