-
Notifications
You must be signed in to change notification settings - Fork 557
Fixed issues in the timer handling state machine integration #3432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Before
After
So this actually improves performance, I think mostly due to the fact that we only look at |
"run a timer when parking thread" in { | ||
val (pool, shutdown) = IORuntime.createWorkStealingComputeThreadPool(threads = 1) | ||
|
||
implicit val runtime: IORuntime = IORuntime.builder().setCompute(pool, shutdown).build() | ||
|
||
try { | ||
val test = IO.sleep(500.millis) *> IO.pure(true) | ||
test.unsafeRunTimed(5.seconds) must beSome(true) | ||
} finally { | ||
runtime.shutdown() | ||
} | ||
} | ||
|
||
// this test ensures that we always see the timer, even when it fires just as we're about to park | ||
"run a timer when detecting just prior to park" in { | ||
val (pool, shutdown) = IORuntime.createWorkStealingComputeThreadPool(threads = 1) | ||
|
||
implicit val runtime: IORuntime = IORuntime.builder().setCompute(pool, shutdown).build() | ||
|
||
try { | ||
val test = IO.sleep(1.milli) *> IO.pure(true) | ||
test.unsafeRunTimed(1.second) must beSome(true) | ||
} finally { | ||
runtime.shutdown() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I am a bit confused by the tests. Are they identical, except for the choice of durations? In which case, that suggests these are "special numbers", but I don't really see why 😕
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I should write some comments explaining the differences here. The numbers aren't particularly magical, they're just attempting to reproduce conditions inside the state machine. The shorter sleep is more likely to be caught by the state machine before parking, while the longer sleep is guaranteed to park the worker. Since these are technically two subtly independent code paths, I wanted to test them independently. For a time, one of these tests was failing while the other passed, and then I was able to invert that state, so they do indeed reproduce different scenarios.
FS2 appears to be fixed! :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I can tell the fix looks good and is actually pretty straightforward to understand. FS2 is happy too. So mostly I'm just confused about the tests.
if (parkUntilNextSleeper()) { | ||
// we made it to the end of our sleeping, so go straight to local queue stuff | ||
pool.transitionWorkerFromSearching(rnd) | ||
state = 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
random reader here: why state
is "named" with magical numbers instead of constants? I found descriptions in header, but it doesn't have as good mnemonics as names would.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, when I was reviewing this I was wishing the same. It's a technique used in the codebase to make sure that optimal bytecode is generated.
cats-effect/core/shared/src/main/scala/cats/effect/IOFiber.scala
Lines 240 to 244 in c8db3da
/* | |
* The cases have to use continuous constants to generate a `tableswitch`. | |
* Do not name or reorder them. | |
*/ | |
(cur0.tag: @switch) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that it requires more work, but something like:
state = 3 // UNPARKED_AND_LOOKING_FOR_WORK
state = 4 // LOOKING_FOR_FIBERS_TO_EXECUTE
could help next reader, but would required a bit of perfectionism to implement.
I didn't notice these lines, I didn't know about so sever effect.
https://github.com/djspiewak/cats-effect/blob/0dd30f9c7f8929a9ec0db4598270f0daad71a8ab/core/jvm/src/main/scala/cats/effect/unsafe/WorkerThread.scala#L306-L309
thanks for clarification!
This may fix @armanbilge's Fs2 issue. The essence here is the fact that
parkUntilNextSleeper
breaks an implicit assumption in the state machine, which is that the local queue cannot get new work items while the thread is parked. This is totally reasonable, but the timers stuff breaks this since new local work can come from timers expiring. The solution is to detect that some timer expired and then reroute directly to local work handling. In the rare case where the worker is awakened by external work and a timer expires simultaneously, the timer takes precedence, but the worker will eventually loop back around to state0
which should poll externally or steal.